目的
AWSのECSのFargateを使ってEventBridge経由でバッチを実行しています。一つ困った問題がありまして、たまに起動してこないことがあります。確認したところECRからイメージが取れないことがありました。
対応方法としては、バッチを複数起動してその一つだけが実際に処理をさせることです。幸いにもECSは起動するタスクの数を制御できるので複数起動は簡単に実現できます。
残った問題は複数起動したバッチの中から一つだけ処理させる部分です。
今回これを実現するためにDynamoDBを使った排他を実現するRustのライブラリを作成しました。
DynamoDB
テーブル名は自由に指定できますが、デフォルトではmutexesになります。
mutex_codeは同期を行う対象を指定します。
mutex_statusはRUNNING, DONE, FAILEDの三つになります。
updated_atは更新した時間のUnixtimestampのミリ秒です。
コード
基本的な使い方
mutexは3つの状態をもち、それぞれロックがとれる時間を指定できます。
DONEとFAILEDは以前のバッチが成功もしくは失敗で終わっていることを示していて実行は完了しています。次にバッチが起動しても問題無い時間を指定しておきます。
RUNNINGは実行中なので、基本的にはロックは取れないのですが、何らかの問題でunlockが呼ばれていない可能性もあるのであまりにも長すぎるRUNNINGに対しては、長い時間を指定してロックを強制的にとることができます。
レコードが無い状態ではロックを行うと、成功してレコードが作成されます。
ロックが取れると以前の状態と更新時間を取得できるので、RUNNINGだった場合など適切な通知をすることができます。
[package]
name = "dy"
version = "0.1.0"
edition = "2018"
[dependencies]
anyhow = "1.0.35"
dynamodb-mutex = "0.1.0"
tokio = {version="~0.2", features=["macros", "rt-core"]}
use anyhow::Result;
use dynamodb_mutex::{
DynamoDbMutex,
DynamoDbMutexResult,
rusoto_core::Region,
};
#[tokio::main]
async fn main() -> Result<()> {
// DynamoDBのリージョン
// DONE状態から変更できる経過ミリ秒
// FAILED状態から変更できる経過ミリ秒
// RUNNING状態から変更できる経過ミリ秒
// テーブル名。Noneの場合はmutexes
let mutex = DynamoDbMutex::new(Region::UsEast1, 10000, 10000, 10000, None);
// テーブル構築
mutex.make_table().await?;
// ロックを取得。同期を取るためのキーとしてmutex_codeを指定する
match mutex.lock("test").await {
// ロック取得成功
Ok(DynamoDbMutexResult::Success(status, updated_at)) => {
// 以前の状態と以前の更新時間が取得できる
println!("pre status {:?} pre updated_at {}", status, updated_at);
// バッチ処理を行う
// アンロック。trueならDONE, falseならFAILEDで更新する
mutex.unlock("test", true).await?;
},
// ロック取得失敗。ロックが取得できない場合は何も処理せず終了させる
Ok(DynamoDbMutexResult::Failure) => println!("lock failed"),
// DynamoDB絡みのエラー
Err(err) => println!("dynamodb error {:?}", err),
}
Ok(())
}
排他制御の確認
10のスレッドを起こして同時実行します。この時ロックが取れたものは1、ロックが取れなかった場合は0を表示します。結果4番目がロックがとれて、それ以外はロックの取得に失敗していることがわかります。
[package]
name = "dy"
version = "0.1.0"
edition = "2018"
[dependencies]
anyhow = "1.0.35"
dynamodb-mutex = "0.1.0"
futures = "~0.3.8"
tokio = {version="~0.2", features=["macros", "rt-core"]}
use anyhow::Result;
use dynamodb_mutex::{
DynamoDbMutex,
DynamoDbMutexResult,
rusoto_core::Region,
};
use futures::{stream, StreamExt};
use std::sync::Arc;
#[tokio::main]
async fn main() -> Result<()> {
let ary = (0..10).collect::<Vec<u32>>();
let size = ary.len();
let list = stream::iter(ary);
let mutex = Arc::new(DynamoDbMutex::new(
Region::UsEast1,
10000,
10000,
10000,
None,
));
let res = list
.map(|id| {
let mutex = Arc::clone(&mutex);
tokio::spawn(async move {
let res = match mutex.lock("test").await {
Ok(DynamoDbMutexResult::Success(_, _)) => 1,
Ok(DynamoDbMutexResult::Failure) => 0,
_ => -1,
};
format!("{}:{}", id, res)
})
})
.buffer_unordered(size);
res.for_each(|res| async move {
match res {
Ok(res) => println!("{}", res),
Err(e) => eprintln!("Got a tokio::JoinError: {}", e),
}
})
.await;
Ok(())
}
4:1
1:0
8:0
5:0
9:0
0:0
7:0
2:0
6:0
3:0