コンテナ化やサーバレス化が進み、従来実装していた方法が通用しない場面が出てきています。
その代表的なものの一つがバッチ処理です。
従来はバッチ処理を行うサーバが存在し、crontabコマンドで設定などを行っていました。
しかしコンテナ化、サーバレス化の影響でバッチサーバというものは存在しなくなります。
AWS上でサーバレスなバッチ処理システムを構築するための方法の一つにCloudWatchEvents(最近、EventBridgeになりました。以下CWEと略します)を利用する方法があります。
しかし、このCWEにはイベントが多重に発生するという仕様があります。なお、Kubernetsの CronJob
やGCPの Cloud Scheduler
でも同様の仕様になっています。
バッチに同実行耐性や冪等性を持たせておけば問題ないのですが、長時間バッチや特殊処理を行うバッチ、作られたのが古くて手を入れづらいバッチなどはなかなか簡単にはいきません。
概要
今回は、CWE + StepFuntionsを使ったバッチ処理で同時実行を防止する方法を考えました。
バッチ処理自体はStepFuntionsからLambdaかECS Taskを呼び出して実行することを想定しています。
アーキテクチャとしては、LambdaとDynamoDBを使ったものになります。
アーキテクチャ
DynamoDBを利用する理由
排他制御を行うのに重要なのは、ReadとWriteをアトミックに行えることです。
Readとはロックされているかどうか確認する処理であり、Writeとはロックする処理です。
ぱっと思いつく利用できそうなやり方としては下記があります。
- Elasticache(Redis)を利用する
- RDS(Mysql)を利用する
RedisではReadとWriteをそのままではアトミックに処理できません。
luaスクリプト書けば出来ますが、ちょっと面倒です。
Mysqlは普通にトランザクション利用すればできますが、この処理のためだけに利用するにはちょっと重いです。
そこでDynamoDBを利用します。
DynamoDBはOptimisticLockを利用できるようになっており、サーバレスでスキーマレスなDBであり、利用も料金もかなりお手軽です。
StepFunctionsのステートグラフ
こんな感じになると思います。
アンロックはバッチ処理本体の成功失敗に関わらず必ず実行されるようにします。
{
"Comment": "CWE+StepFunctionsを利用したバッチ処理の排他制御ステートグラフ",
"StartAt": "Lock",
"States": {
"Lock": {
"Type": "Task",
"Resource": "ロック関数のARN",
"Parameters": {
"Key": "hogekey",
"Ttl": 300
},
"Catch": [
{
"ErrorEquals": ["LockError"],
"Next": "Lock Error"
},
{
"ErrorEquals": ["States.ALL"],
"Next": "Other Lock Error"
}
],
"ResultPath": "$.Lock",
"Next": "Lock Ok"
},
"Lock Ok": {
"Type": "Pass",
"Next": "Run"
},
"Other Lock Error": {
"Type": "Pass",
"Next": "Lock Fail"
},
"Lock Error": {
"Type": "Pass",
"Next": "Lock Fail"
},
"Lock Fail": {
"Type": "Fail",
"Cause": "Can not get lock."
},
"Run": {
"Type": "Task",
"Resource": "バッチ処理のARN",
"Parameters": {
"Command": "hoge"
},
"Catch": [
{
"ErrorEquals": ["States.ALL"],
"Next": "Run Error"
}
],
"ResultPath": "$.Cron",
"Next": "Run Ok"
},
"Run Ok": {
"Type": "Pass",
"Result": {
"statusCode": 200
},
"ResultPath": "$.Run",
"Next": "UnLock"
},
"Run Error": {
"Type": "Pass",
"Result": {
"statusCode": 400
},
"ResultPath": "$.Run",
"Next": "UnLock"
},
"UnLock": {
"Type": "Task",
"Resource": "アンロック関数のARN",
"Parameters": {
"key": "hogekey"
},
"ResultPath": "$.UnLock",
"Next": "Check Run Result"
},
"Check Run Result": {
"Type": "Choice",
"Choices": [
{
"Not": {
"Variable": "$.Run.statusCode",
"NumericEquals": 200
},
"Next": "Run Fail"
}
],
"Default": "Run End"
},
"Run End": {
"Type": "Pass",
"End": true
},
"Run Fail": {
"Type": "Fail",
"Cause": "Run failed."
}
}
}
DynamoDBスキーマ
{
"Key": "hoge",
"Ttl": 300
}
- Key: 文字列
- プライマリーキーです。実行するバッチ処理の名前とか入れておくと良いと思います
- Ttl: 数値
- DynamoDBのttl機能を利用するための項目です
- アンロック処理が失敗した場合の保険です。最大48時間の遅延があります
ロック/アンロック関数
DynamoDBのPutでロックを、Deleteでアンロックを実現します。
実際にPut/Deleteする部分のみ抜粋します。
以下のlambdaコードはnode.jsです。
ロック
const DynamoDB = require('aws-sdk/clients/dynamodb');
const ddb = new DynamoDB.DocumentClient({
region: "リージョンを指定",
});
function LockError(message) {
this.name = 'LockError';
this.message = message;
}
LockError.prototype = new Error();
const lock = async (key, ttl) => {
const nowTime = Math.floor((new Date).getTime()/1000);
const ttlTime = nowTime + ttl; // DynamoDBに設定するttlは絶対時間
const params ={
TableName: "LockTable",
Item: {
Key: key,
Ttl: ttlTime,
},
ExpressionAttributeNames: {
'#k': 'Key',
},
ConditionExpression: 'attribute_not_exists(#k)', // 今回の一番の肝になる部分。アトミックにロック確認とロック処理を行える
};
try {
await ddb.put(params).promise();
} catch (e) {
const message = 'Lock Fail.';
throw new LockError(message); // 独自ロックエラーオブジェクト
}
};
// :
// 以下略
// :
アンロック
const DynamoDB = require('aws-sdk/clients/dynamodb');
const ddb = new DynamoDB.DocumentClient({
region: "リージョンを指定",
});
function UnLockError(message) {
this.name = 'UnLockError';
this.message = message;
}
UnLockError.prototype = new Error();
const unlock = async key => {
const params = {
TableName: "LockTable",
Key: {
Key: key
},
ExpressionAttributeNames: {
'#k': 'Key'
},
ConditionExpression: 'attribute_exists(#k)',
};
try {
await ddb.delete(params).promise();
} catch (e) {
const message = 'UnLock Fail.';
throw new UnLockError(message); // 独自アンロックエラーオブジェクト
}
};
// :
// 以下略
// :
まとめ
DynamoDBを利用することで簡単に排他制御を実装できました。
しかもDynamoDBは運用が非常に簡単です。
KubernetesやGCPを利用しているとまた違った方法が必要になりますが、ECSやLambdaを運用している場合はこれでなんとかなりそうです。
ただし、連続実行は防げないので、新しく作るバッチ処理はクラウドネイティブを意識して冪等性などを考えながら作っていく必要があります。