LoginSignup
0
0

More than 1 year has passed since last update.

RustでRedisを使ったバッチ調停のライブラリを作った

Posted at

目的

バッチを起動して処理を行う時に、冗長性を確保して複数台で起動しバッチを行うのは1のプロセスだけにしてほしいことがあります。PostgreSQLとDyanamoDBで既にやったことありましたが、今回はRedisでやってみました。

crate.io上のページ

以前DynamoDBで同じことをやった記事も紹介します。

AWSのECSで複数のバッチを実行した時に排他する

コード

Redis

Redisではハッシュを使っています。ハッシュのフィールドを起動したバッチの名前を指定して、値はバッチを起動した時間を保存します。
先にそのフィールドに値があるか確認して、無ければそのまま書き込みます。
ある場合は開始してから一定期間経過しているか判定して、経過していれば書き込みますが、そうでないなら、そのバッチは実行できません。

上記の処理をアトミックにRustのコードから実行するのは大変なので、Redisの内部で実行できるLuaスクリプトで処理を行いました。

batches_lock.lua
local key = KEYS[1]
local field = KEYS[2]
local start_utsms = tonumber(ARGV[1])
local lock_milli_second = tonumber(ARGV[2])
local value = redis.call('HGET', key, field)
if value then
  local current_start_utsms = tonumber(value)
  if current_start_utsms + lock_milli_second < start_utsms then
    redis.call('HSET', key, field, start_utsms)
    return "1"
  end
else
  redis.call('HSET', key, field, start_utsms)
  return "1"
end

Rust

Rustでは現在時間を取得してLuaスクリプトを呼び出しているだけです。Luaからの戻り値を見てバッチを実行してよいかを判断するbool値を返しています。

RustでRedisのLuaスクリプトを呼び出すサンプルにはちょうど良いかもしれません。Luaスクリプトではkeyとargを複数渡すことでできます。Luaスクリプト無いではKEYSとARGVでアクセスできます。keyとargの差はほとんど無いので使い分けは特に意識する必要はなさそうです。

テストは呼び出し方のサンプルになっています。

Cargo.toml
[package]
name = "redis-batches"
version = "0.1.2"
edition = "2021"

[dependencies]
once_cell = "1.15.0"
redis = { version = "0.22", features = ["script", "aio"]}

[features]
default = ["tokio-comp"]
tokio-comp = ["redis/tokio-comp"]
async-std-comp = ["redis/async-std-comp"]

[dev-dependencies]
tokio = { version = "1", features = ["full"] }
lib.rs
use once_cell::sync::Lazy;
use redis::Script;
use std::time::SystemTime;

static LOCK_SCRIPT: Lazy<Script> =
    Lazy::new(|| Script::new(include_str!("./redis_scripts/batches_lock.lua")));

pub async fn lock(
    key: &str,
    field: &str,
    lock_milli_second: u128,
    redis_conn: &mut redis::aio::Connection,
) -> Result<bool, redis::RedisError> {
    let start_utsms = SystemTime::now()
        .duration_since(SystemTime::UNIX_EPOCH)
        .unwrap()
        .as_millis();
    let mut invocation = LOCK_SCRIPT.key(key);
    invocation.key(field);
    invocation.arg(start_utsms.to_string());
    invocation.arg(lock_milli_second.to_string());
    let res: Option<String> = invocation.invoke_async(redis_conn).await?;
    Ok(res.is_some())
}

#[cfg(test)]
mod tests {
    use crate::lock;

    #[tokio::test]
    async fn it_works() {
        let client = redis::Client::open("redis://127.0.0.1/").unwrap();
        let mut con = client.get_async_connection().await.unwrap();
        let res1 = lock("mutex", "test", 1000, &mut con).await.unwrap();
        assert_eq!(true, res1);
        let res2 = lock("mutex", "test", 1000, &mut con).await.unwrap();
        assert_eq!(false, res2);
        tokio::time::sleep(tokio::time::Duration::from_millis(1001)).await;
        let res3 = lock("mutex", "test", 1000, &mut con).await.unwrap();
        assert_eq!(true, res3);
        tokio::time::sleep(tokio::time::Duration::from_millis(1001)).await;
    }
}

まとめ

処理が終わった後にロックを解除することも考えましたがRAIIを意識してDropの中でasyncを呼び出してみたんですがうまく動作しませんでした。
定期的で起動間隔がそれなりに離れていて実行時間が短いバッチなら、特にロックを解除しなくても問題無いと思います。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0