一、概要
本記事では、Ceph Clientモジュールにおいてロックの粒度が増えたことによる相互干渉の複雑化に対処するために設計・実装された「ロック管理機構(LockManager)」について解説する。従来の lockdep ベースの事後検出では性能劣化の問題があったため、本実装ではスレッドローカルに基づく事前検出方式を採用し、デッドロックの予防、ロック順序の管理、回帰ロックの検査、複数ロックの自動取得を一貫して制御する。また、ロックの重み付けに基づいた取得検査や、関数単位でのロックスコープ制御、LockGuard/autoLockといったユーティリティの導入により、実運用に耐えうる高いスレッド安全性を確保している。
二、背景
Clientモジュールのロック構造が再設計されたことにより、ロックの数が増えただけでなく、Ceph Client側の処理も複雑であるため、各ロックが保護するリソース同士が相互に関係する場面が多くなった。
その結果として、複数のロックがネストされる(入れ子になる)ケースが必然的に発生する。
複数ロックのネストにおいては、ロック取得順序が正しくなければデッドロックが発生する。例えば以下のようなケースである:
1. スレッド1がロックAを取得中、スレッド2がロックAを取得しようとしてブロックされる → デッドロック
2. スレッド1がロックA → B の順で取得、スレッド2が B → A の順で取得 → 相互待機によりデッドロック
3. スレッド1:A→B、スレッド2:B→C、スレッド3:C→A → 多段階循環依存によりデッドロック
4. 同一スレッドによるロックの重複取得(再取得)が不正に行われる → 想定外動作の原因
このような問題に対し、過去には Linuxカーネル等でも用いられる「lockdep アルゴリズム」が知られている。
lockdep はロック取得履歴をグラフ構造で追跡し、デッドロックの可能性を「事後検出」する方式であるが、以下のような課題がある:
• 全スレッド共通のロック履歴グローバル構造体が必要となり、これ自体にロックが必要
• 多スレッド環境で競合が激化し、性能が著しく低下する
• デッドロックが発生した「後で」しか分からない(事後検出)
そこで本実装では、上記の課題を回避するため、各スレッドが取得するロックの順序のみを個別に検査する「事前検出方式」を採用した。
このアプローチでは、スレッドローカル領域に記録された取得順序情報をもとにロックの正当性を評価するだけで済むため、グローバル競合が発生せず、性能への影響も最小限に抑えられる。
三、ロック管理状態機構
(一)ロック連携テーブルの整理
Clientモジュールのロックを分割した結果、全体で24種類のロックが存在する。それらの優先順位と依存関係を整理した「ロック連携テーブル(ロックネスト順一覧)」を以下に示す:
優先順位(高い) | 優先順位(低い) | |||||||
---|---|---|---|---|---|---|---|---|
client_lock | fh_lock | inode_map_lock | im_lock | mdsmap_lock | mds_sessions_lock | session_lock | mds_requests_lock | |
dn_lock | ||||||||
oc_lock | ||||||||
trim_lock | ||||||||
root_lock | ||||||||
opened_session_caps_lock | ||||||||
delayed_remove_caps_lock | ||||||||
snap_realms_lock | snaprealm_lock | |||||||
fd_or_odirs_lock | ||||||||
timer_lock | ||||||||
cr_inode_lock | ||||||||
free_inode_map_lock | ||||||||
generic_lock | ||||||||
doing_lock | ||||||||
cond_lock | ||||||||
qos_lock |
このテーブルでは、左から右に向かってロックの優先度が高い順になっており、同一列にあるロック同士は同じ優先度であることを示している。
優先度が同じロックは同時に取得してはならない。
たとえば、以下のような関係が読み取れる:
• client_lock > im_lock > session_lock > dn_lock
• client_lock > trim_lock
• im_lock > oc_lock
• fd_or_odirs_lock と timer_lock は相互に独立(依存関係なし)
このようなロック依存の優先関係を明確にすることで、ロック取得順の明示化とデッドロックの予防が可能となる。
(二)状態機構の定義
上記のロック優先テーブルをもとに、各ロックの状態を管理するための状態遷移機構(状態配列)を定義する。
const lock_state_t LockManager::lock_state[LOCK_MAX] = {
// larger recursive readwrite spinlock weight lock_ptr
[CLIENT_LOCK] = {0, false, false, false, 10, std::make_shared<LockWrapper<Client, Mutex>>(&Client::client_lock)},
[DOING_LOCK] = {0, false, false, false, 10, std::make_shared<LockWrapper<Client, Mutex>>(&Client::doing_lock)},
[FH_LOCK] = {CLIENT_LOCK, false, false, false, 20, std::make_shared<LockWrapper<Fh, Mutex>>(&Fh::fh_lock)},
[TRIM_LOCK] = {CLIENT_LOCK, false, false, false, 20, std::make_shared<LockWrapper<Client, Mutex>>(&Client::trim_lock)},
[INODE_MAP_LOCK] = {FH_LOCK, false, true, false, 30, std::make_shared<LockWrapper<Client, RWLock>>(&Client::inode_map_lock)},
[COND_LOCK] = {FH_LOCK, false, false, false, 30, std::make_shared<LockWrapper<MetaRequest, Mutex>>(&MetaRequest::cond_lock)},
[IM_LOCK] = {INODE_MAP_LOCK, true, false, false, 40, std::make_shared<LockWrapper<Inode, Mutex>>(&Inode::im_lock)},
[MDSMAP_LOCK] = {IM_LOCK, false, true, false, 50, std::make_shared<LockWrapper<Client, RWLock>>(&Client::mdsmap_lock)},
[OC_LOCK] = {IM_LOCK, false, false, false, 50, std::make_shared<LockWrapper<Client, Mutex>>(&Client::oc_lock)},
[ROOT_LOCK] = {IM_LOCK, false, true, false, 50, std::make_shared<LockWrapper<Client, RWLock>>(&Client::root_lock)},
[FD_OR_ODIRS_LOCK] = {IM_LOCK, false, false, true, 50, std::make_shared<LockWrapper<Client, spinlock>>(&Client::fd_or_odirs_lock)},
[TIMER_LOCK] = {IM_LOCK, false, false, false, 50, std::make_shared<LockWrapper<Client, Mutex>>(&Client::timer_lock)},
[CR_INODE_LOCK] = {IM_LOCK, false, false, false, 50, std::make_shared<LockWrapper<Client, Mutex>>(&Client::cr_inode_lock)},
[FREE_INODE_MAP_LOCK] = {IM_LOCK, false, false, true, 50, std::make_shared<LockWrapper<Client, spinlock>>(&Client::free_inode_map_lock)},
[GENERIC_LOCK] = {IM_LOCK, false, true, false, 50, std::make_shared<LockWrapper<Client, RWLock>>(&Client::generic_lock)},
[QOS_LOCK] = {IM_LOCK, false, false, false, 50, std::make_shared<LockWrapper<QoSClient, Mutex>>(&QoSClient::qos_lock)},
[MDS_SESSIONS_LOCK] = {MDSMAP_LOCK, false, true, false, 60, std::make_shared<LockWrapper<Client, RWLock>>(&Client::mds_sessions_lock)},
[SESSION_LOCK] = {MDS_SESSIONS_LOCK, false, false, true, 70, std::make_shared<LockWrapper<MetaSession, std::shared_ptr<spinlock>>>(&MetaSession::session_lock)},
[MDS_REQUESTS_LOCK] = {SESSION_LOCK, false, true, false, 80, std::make_shared<LockWrapper<Client, RWLock>>(&Client::mds_requests_lock)},
[DN_LOCK] = {SESSION_LOCK, false, false, false, 80, std::make_shared<LockWrapper<Dentry, Mutex>>(&Dentry::dn_lock)},
[OPENED_SESSION_CAPS_LOCK] = {SESSION_LOCK, false, false, false, 80, std::make_shared<LockWrapper<Client, Mutex>>(&Client::opened_session_caps_lock)},
[DELAYED_REMOVE_CAPS_LOCK] = {SESSION_LOCK, false, false, true, 80, std::make_shared<LockWrapper<Client, spinlock>>(&Client::delayed_remove_caps_lock)},
[SNAP_REALMS_LOCK] = {SESSION_LOCK, false, false, true, 80, std::make_shared<LockWrapper<Client, spinlock>>(&Client::snap_realms_lock)},
[SNAPREALM_LOCK] = {SNAP_REALMS_LOCK, true, false, false, 90, std::make_shared<LockWrapper<SnapRealm, Mutex>>(&SnapRealm::snaprealm_lock)}
};
この状態配列は、client_lock から snaprealm_lock に至るまで、優先度の高い順に一意のインデックスで管理されており、各エントリは次の情報を保持する:
フィールド | 説明 |
---|---|
larger | 本ロックよりも優先度が高いロックのインデックス(存在しない場合は0) |
recursive | 回帰ロック(再帰ロック)かどうかのフラグ(例:im_lock, snaprealm_lock) |
rwlock | 読み書きロックであるかどうか |
spinlock | スピンロックであるかどうか |
weight | ロックの重み値(優先度判断に使用) |
lock_ptr | 実際にロック/アンロックを行う関数ポインタ(テンプレートで処理) |
これにより、ロックの取得および解放処理は統一的かつ安全に行える。
(三)ロックマネージャの実装
上述のロック状態機構を活用するため、Client モジュール全体のロックを一元管理するためのモジュール:LockManager を実装した。
🔹 thread_local によるスレッド状態の管理
LockManager 内部では、各スレッドごとに独立した状態を保持するため、thread_local 変数を定義している。
この変数は、スレッドが取得したロック情報(ロック順・カウント・再帰状況など)を記録・検証するためのものである。
提供されている主なインターフェースは以下の通り:
• register_tl_static():スレッド開始時に状態構造体を登録
• destroy_tl_static():スレッド終了前に登録状態を破棄
• acquireLock():ロックを取得(検証付き)
• dropLock():ロックを解放(検証付き)
🔹 スレッド制御の前提と用途
Client モジュールは中間層にあたるため、上位は ceph-fuse や ganesha といった実装からスレッドが呼び出される。
このため、Client内部で新規に生成するスレッド以外は、上位層由来のスレッドであり、そのロック管理を追跡するには、Clientに入るタイミングで register_tl_static() を明示的に呼び出す必要がある。
ロックが必要な箇所では acquireLock() を用いてロックを取得し、処理終了後は dropLock() にて解放する。
スレッドの処理終了時には、必ず destroy_tl_static() を呼び出して状態を破棄する。
acquireLock(INODE_MAP_LOCK, *this, READ);
acquireLock(IM_LOCK, *in);
acquireLock(SESSION_LOCK, *s);
dropLock(SESSION_LOCK, *s);
dropLock(INODE_MAP_LOCK, *this, READ);
//....
🔹 自動登録・解放を支援するスコープ管理ユーティリティ
ロックマネージャには、LockManagerScope という自動登録・自動解放のスコープクラスも提供されており、以下のように利用することで、RAIIパターンにより状態管理を簡素化できる。
(四)スコープロックユーティリティ:LockGuard
関数やコードブロック単位でのロック制御を簡潔に行うため、std::lock_guard に似たスコープベースのロック制御クラス LockGuard を実装している。
この LockGuard は、指定したロック対象オブジェクトとロック種別をテンプレートで受け取り、RAII(Resource Acquisition Is Initialization)パターンで自動的にロック/アンロック処理を行う。
LockGuard<Inode, IM_LOCK> guard(lock_manager, *in);
このように、クラス名とロック種別をテンプレート引数として与えることで、LockManager 経由で安全にロック操作を行うことができる。
• スコープに入った瞬間に acquireLock() を呼び出し
• スコープから抜ける際に dropLock() が自動で実行される
このユーティリティにより、ロック忘れや解除漏れを防止し、可読性と安全性を大幅に向上させることができる。
(五)自動ロック機構:autoLock
複数のロックを一括かつ正しい順序で安全に取得するために、LockManager には自動ロック機構 autoLock() が実装されている。
この機能は、任意個数のロック取得を一度に指定可能であり、特に次のようなユースケースに適している:
• 多数のロックを一度に安全な順序で取得したい場合
• 一時的にロック順を変更したい処理
autoLock(IM_LOCK, std::ref(*in), -1, false,
INODE_MAP_LOCK, std::ref(*client), WRITE, false);
• 4つを1セット(ロック種別、対象オブジェクト、モード、回帰チェックスキップ)とし、引数の数は4の倍数である必要がある
• mode は読書きモード指定(例:RWMode::READ / WRITE)
• skip_recur_lock_check は回帰ロック順チェックを省略するかどうかのフラグ(既知の安全領域で使用)
🔹 処理概要
- すでに保持しているロックがあれば dropLock() で一度解放
- 渡されたロックセットを優先度順にソート(weight に基づく)
- ソート済み順序で acquireLock() を順次実行し、安全な状態に整える
このように autoLock を使用することで、複数ロックの取得が必要な場面でもコードの簡潔さと安全性を両立できる。
四、検査方式:ロック取得・解放の正当性チェック
ロックの取得や解放にあたって、LockManager はスレッドごとの状態に基づいてロック操作の正当性を事前に検証する仕組みを備えている。
(一)acquireLock:ロック取得の検査
acquireLock 関数は、指定されたロックを取得する前に以下の検査を実施する:
1. 通常ロックの取得順序が正しいか(can_lock)
2. 再帰ロックが許可されているか(can_recursive_lock)
3. ロック取得操作を実行(lock)
template <typename T>
void acquireLock(LockType lock, T& object, int mode = -1, bool skip_recur_lock_check = false) {
can_lock(lock); // 順序・重複の整合性検査
if (!skip_recur_lock_check)
can_recursive_lock(lock, object); // 再帰ロック検査
lock_state[lock].lock_info->lock(static_cast<void*>(&object), mode); // 実際のロック処理
}
🔹 can_lock の検査内容
この関数は、以下の条件を満たすかを検証する:
• 通常ロックの重複取得は禁止(カウントが1以上ならエラー)
• 取得しようとするロックの weight(重み)が前回取得より低い必要がある
• 同じ weight のロックは、再帰ロックのみ許可
void LockManager::can_lock(LockType lock) {
if (!lock_state[lock].recursive && (lock_static->lock_count.count(lock) > 0)) {
// 非再帰ロックの取得回数は1回を超えてはならない
lderr(client->cct) << "repeated lock, lock type: " << get_lock_name(lock) << dendl;
ceph_assert("repeated lock" == 0);
}
if (lock_static->last_weight <= lock_state[lock].weight) {
// 取得しようとするロックの重み値が前回より高ければ取得可能
if ((lock_static->last_weight == lock_state[lock].weight) &&
(!lock_state[lock].recursive)) {
// 同じ重み値の場合は再帰ロックのみ取得を許可する
lderr(client->cct) << "lock type: " << get_lock_name(lock) << " is no interaction" << dendl;
goto fail;
}
auto it = lock_static->lock_count.find(lock);
if (it != lock_static->lock_count.end()) {
// ロック取得履歴に既に記録がある場合、取得回数をチェックする
if (!lock_state[lock].recursive) {
// 非再帰ロックは取得回数が最大1回まででなければならない
lderr(client->cct) << "lock type: " << get_lock_name(lock)
<< " do not allow non-recursive to exceed the lock layer!" << dendl;
ceph_assert("not recursive exceed layer" == 0);
}
if (it->second >= RECUR_LOCK_MAX_NUM) {
// 再帰ロックの取得回数は最大2回までとする
lderr(client->cct) << "lock type: " << get_lock_name(lock)
<< " do not allow recursive to exceed the lock layers!" << dendl;
ceph_assert(it->second == BASIC_LOCK_MAX_NUM);
}
it->second++; // 取得回数が上限内であれば、再帰ロックの取得回数をインクリメント
} else {
// 初回ロック取得時には履歴に記録を追加(再帰か否かに関わらず)
lock_static->lock_count[lock] = BASIC_LOCK_MAX_NUM;
lock_static->total_weight += lock_state[lock].weight; // 合計重み値に加算、再帰ロックの2回目取得時は加算されない
}
lock_static->last_weight = lock_state[lock].weight; // 今回のロック取得の重みを記録
return;
}
fail:
lderr(client->cct) << "lock has last_weight: " << lock_static->last_weight
// 上記検査に失敗:ロック順序違反または二重取得のためアサーション
<< " but acquire lock last_weight: " << lock_state[lock].weight
<< " current lock: " << get_lock_name(lock)
<< dendl;
ceph_assert("lock order error!" == 0);
}
チェックに合格すると、ロックカウントや weight を記録し、ロック取得処理へ進む。
🔹 can_recursive_lock の検査内容(回帰ロック)
回帰ロックの場合、以下のような追加検査が行われる:
• IM_LOCK:親→子Inodeの順になっているかを確認(業務ロジックに基づく)
• SNAPREALM_LOCK:子realm→親realmの順であるかを確認
template<typename T>
void can_recursive_lock(LockType lock, T& object) {
if (!lock_state[lock].recursive)
return;
auto it = lock_static->lock_count.find(lock);
if (it != lock_static->lock_count.end() && it->second == RECUR_LOCK_MAX_NUM) {
// 再帰ロックの2回目のロック取得時
if (lock == IM_LOCK) {
// Client モジュールを呼び出して、親ディレクトリ → 子の順かどうかを検証する
ceph_assert(lock_static->recur_static.p_inode != nullptr);
ceph_assert(lock_static->recur_static.p_inode != static_cast<void*>(&object));
ceph_assert(client->inode_relation_check(static_cast<void*>(lock_static->recur_static.p_inode), static_cast<void*>(&object)));
} else if (lock == SNAPREALM_LOCK) {
// Client モジュールを呼び出して、子 snaprealm → 親 snaprealm の順かどうかを検証する
ceph_assert(lock_static->recur_static.c_realm != nullptr);
ceph_assert(lock_static->recur_static.c_realm!= static_cast<void*>(&object));
ceph_assert(client->realm_relation_check(static_cast<void*>(&object), static_cast<void*>(lock_static->recur_static.c_realm)));
}
}
recursive_lock_record(lock, object); // 検証に通過したら記録を行う
}
ロック取得処理のための特化テンプレートの実装は以下の通りであり、
lock を呼び出す際にロックの種類に応じた適切な操作が自動的に行われる。
struct LockWrapperBase {
virtual void lock(void* object, int mode) const = 0;
virtual void unlock(void* object, int mode) const = 0;
virtual bool is_locked(void* object, int mode) const = 0;
virtual bool is_locked_by_me(void* object) const = 0;
virtual ~LockWrapperBase() = default;
};
template<typename T, typename LockType>
struct LockWrapper : public LockWrapperBase {
LockType T::*lock_ptr;
LockWrapper(LockType T::*ptr) : lock_ptr(ptr) {}
void lock(void* object, int mode) const override {
(static_cast<T*>(object)->*lock_ptr).Lock();
}
void unlock(void* object, int mode) const override {
(static_cast<T*>(object)->*lock_ptr).Unlock();
}
bool is_locked(void* object, int mode) const override {
return (static_cast<T*>(object)->*lock_ptr).is_locked();
}
bool is_locked_by_me(void* object) const override {
return (static_cast<T*>(object)->*lock_ptr).is_locked_by_me();
}
};
// ユーザー定義のロッククラス(例:Mutex)に適合するテンプレート実装
template<typename T>
struct LockWrapper<T, RWLock> : public LockWrapperBase {
RWLock T::*lock_ptr;
LockWrapper(RWLock T::*ptr) : lock_ptr(ptr) {}
void lock(void* object, int mode) const override {
if (mode == RWMode::READ) {
(static_cast<T*>(object)->*lock_ptr).get_read();
} else {
(static_cast<T*>(object)->*lock_ptr).get_write();
}
}
void unlock(void* object, int mode) const override {
if (mode == RWMode::READ) {
(static_cast<T*>(object)->*lock_ptr).put_read();
} else {
(static_cast<T*>(object)->*lock_ptr).put_write();
}
}
bool is_locked(void* object, int mode) const override {
if (mode == -1) {
return (static_cast<T*>(object)->*lock_ptr).is_locked();
} else if (mode == RWMode::WRITE) {
return (static_cast<T*>(object)->*lock_ptr).is_wlocked();
}
}
bool is_locked_by_me(void* object) const override {
return false;
}
};
// 読み書きロック(RWLock)に対応する特化テンプレート
// 他にも spinlock や shared_ptr<spinlock> に対応するテンプレート特殊化が存在する。
(二)dropLock: 実装と解説
以下は dropLock の実装例です:
// ロック解除を行うテンプレート関数
// skip_recur_lock_check が false の場合、再帰ロックのチェックも行う
template <typename T>
void dropLock(LockType lock, T& object, int mode = -1, bool skip_recur_lock_check = false) {
if (!skip_recur_lock_check)
can_drop_recursive_lock(lock, object); // 再帰ロックの解除に伴う記録更新と検査
can_drop_lock(lock); // 総合的な解放妥当性の検査
lock_state[lock].lock_info->unlock(static_cast<void*>(&object), mode); // 実際のロック解除操作
}
この dropLock は acquireLock と対になるもので、ロック解除時に再帰ロックであればまずロック記録を更新し、その後ロック解除の妥当性を確認します。
以下は再帰ロックに関する解除記録の処理です:
template<typename T>
void can_drop_recursive_lock(LockType lock, T& object) {
if (lock == IM_LOCK) {
ceph_assert(lock_static->recur_static.p_inode != nullptr);
if (static_cast<void*>(&object) == lock_static->recur_static.p_inode) {
// pinode -> nullptr または cinode -> nullptr など記録を更新
recursive_unlock_record(lock, true);
} else {
recursive_unlock_record(lock, false);
}
} else if (lock == SNAPREALM_LOCK) {
ceph_assert(lock_static->recur_static.c_realm != nullptr);
if (static_cast<void*>(&object) == lock_static->recur_static.c_realm) {
// crealm -> nullptr または prealm -> nullptr など記録を更新
recursive_unlock_record(lock, true);
} else {
recursive_unlock_record(lock, false);
}
}
}
次に、実際のロック解除の妥当性を確認する can_drop_lock 関数です:
void LockManager::can_drop_lock(LockType lock) {
auto it = lock_static->lock_count.find(lock);
if (it != lock_static->lock_count.end()) {
int count = --it->second; // ロック数の記録をデクリメント
if (count == NONE_LOCK_STATUS) {
// ロック解放完了:記録を削除し、重みも調整
lock_static->lock_count.erase(it);
lock_static->total_weight -= lock_state[lock].weight;
}
ceph_assert(lock_static->total_weight >= 0); // ロックと解除のバランス不一致
// 重みが減少した場合、last_weight を更新
if (lock_static->total_weight <= lock_static->last_weight) {
lock_static->last_weight = lock_static->total_weight;
}
if (count == NONE_LOCK_STATUS) {
if (lock_static->last_weight <= lock_state[lock].weight) {
// より大きなロックが残っている場合、それを新しい last_weight とする
if (lock_static->lock_count.size() > 1) {
auto new_it = std::prev(lock_static->lock_count.end());
lock_static->last_weight = lock_state[new_it->first].weight;
}
}
}
} else {
// ロックが存在しない状態で解除しようとした → エラー
lderr(client->cct) << "unlock is not allowed when unlocked! lock type: " << get_lock_name(lock) << dendl;
ceph_assert("cannot unlocked repeated!" == 0);
}
}
最終的に、テンプレートに基づいた unlock() 実行を通じて、オブジェクトに対して正しいロック解除が行われます。
(三)isLocked と isLockedByMe の実装
// ロック状態を確認する関数
// lock_info オブジェクト経由で現在ロックされているかを確認
template <typename T>
bool isLocked(LockType lock, T& object, int mode = -1) {
return lock_state[lock].lock_info->is_locked(static_cast<void*>(&object), mode);
}
// 自スレッドがロックを保持しているかを確認
template <typename T>
bool isLockedByMe(LockType lock, T& object) {
return lock_state[lock].lock_info->is_locked_by_me(static_cast<void*>(&object));
}
この2つの関数は、与えられたロックが現在保持されているか、あるいは現在のスレッドがそのロックを保持しているかを確認するために用います。これにより、デバッグや安全な処理のための状態確認が可能になります。
(四)再帰ロックに関する記録とルール
現在、Client モジュールにおいて再帰ロック(リカージョンロック)に指定されているのは、IM_LOCK(Inode 保護用)と SNAPREALM_LOCK(SnapRealm 保護用)の 2 種類のみです。
一般的に再帰ロックは「同一リソースに対し複数回ロック取得を許容する」という性質を持ち、呼び出し側の設計によってデッドロックを防ぐ前提で使用されます。しかし、Client モジュールでは業務上の安全性を担保するため、再帰ロックであってもロック取得には厳密な順序ルールが必要です。
IM_LOCK のロック取得ルール
• 同一スレッド内で 最大2階層までの IM_LOCK を取得可能。
• 必ず親ディレクトリ → 子ディレクトリ/ファイル の順にロックを取得すること。
• 子 → 親の順序での取得や、無関係な2つの Inode に対する同時ロック取得は禁止。
SNAPREALM_LOCK のロック取得ルール
• 同一スレッド内で 最大2階層までの SNAPREALM_LOCK を取得可能。
• 必ず子 SnapRealm → 親 SnapRealm の順にロックを取得すること。
• 親 → 子の順序や、関連性のない2つの SnapRealm への同時ロック取得は禁止。
ロック記録の方式
再帰ロックを取得する際には、以下のような構造で記録されます:
場合1
parent | children | |
---|---|---|
1 Lock(P) | p_inode | nullptr |
2 Lock(C) | p_inode | c_inode |
3 Unlock(C) | p_inode | nullptr |
4 Unlock(P) | nullptr | nullptr |
場合2
parent | children | |
---|---|---|
1 Lock(P) | p_inode | nullptr |
2 Lock(C) | p_inode | c_inode |
3 Unlock(P) | c_inode | nullptr |
4 Lock(CC) | c_inode | cc_inode |
5 Unlock(C) | cc_inode | nullptr |
6 Unlock(CC) | nullptr | nullptr |
また、IM_LOCK のような再帰ロックを取得する際には、そのロック単体のルールだけでなく、他のロックとの取得順序ルールとの整合性も同時に検証されます。
実装内でのロック記録の処理
実際の記録・解除処理は以下の関数で行われます:
template<typename T>
void recursive_lock_record(LockType lock, T& object) {
if (lock == IM_LOCK) {
if (!lock_static->recur_static.p_inode)
lock_static->recur_static.p_inode = static_cast<void*>(&object);
else
lock_static->recur_static.c_inode = static_cast<void*>(&object);
} else if (lock == SNAPREALM_LOCK) {
if (!lock_static->recur_static.c_realm)
lock_static->recur_static.c_realm = static_cast<void*>(&object);
else
lock_static->recur_static.p_realm = static_cast<void*>(&object);
}
}
void recursive_unlock_record(LockType lock, bool adjust_record) {
if (lock == IM_LOCK) {
if (adjust_record) {
if (lock_static->recur_static.c_inode) {
lock_static->recur_static.p_inode = lock_static->recur_static.c_inode;
lock_static->recur_static.c_inode = nullptr;
} else {
lock_static->recur_static.p_inode = nullptr;
}
} else {
lock_static->recur_static.c_inode = nullptr;
}
} else if (lock == SNAPREALM_LOCK) {
if (adjust_record) {
if (lock_static->recur_static.p_realm) {
lock_static->recur_static.c_realm = lock_static->recur_static.p_realm;
lock_static->recur_static.p_realm = nullptr;
} else {
lock_static->recur_static.c_realm = nullptr;
}
} else {
lock_static->recur_static.p_realm = nullptr;
}
}
}
この親子関係のチェックは、業務要件に基づいたものであるため、Client モジュール側のインターフェースを通じてアプリケーション層のロジックに確認を委ねています。
(五)スコープロック(LockGuard)
スコープロックの実装は以下のようになっています(コードは省略):
template <typename T, LockType lock>
class LockGuard {
public:
LockGuard(LockManager& manager, T& object, int mode = -1, bool skip_recur_lock_check = false)
: mgr(manager), obj(object), md(mode), sk_recu_lock_check(skip_recur_lock_check) {
ceph_assert(&mgr);
mgr.acquireLock(lock, obj, md, sk_recu_lock_check);
}
LockGuard(const LockGuard&) = delete;
LockGuard& operator=(const LockGuard&) = delete;
LockGuard(LockGuard&& other) noexcept
: mgr(other.manager), obj(other.obj), md(other.mode), sk_recu_lock_check(other.skip_recur_lock_check) {
other.obj = nullptr;
}
LockGuard& operator=(LockGuard&& other) noexcept {
if (this != &other) {
mgr.dropLock(lock, obj, md, sk_recu_lock_check);
mgr = other.mgr;
obj = other.obj;
md = other.md;
sk_recu_lock_check = other.sk_recu_lock_check;
other.obj = nullptr;
}
return *this;
}
~LockGuard() {
mgr.dropLock(lock, obj, md, sk_recu_lock_check);
}
private:
LockManager& mgr;
T& obj;
int md;
int sk_recu_lock_check;
};
これはつまり、LockGuard を構築することでスコープ開始時に自動的にロックを取得し、スコープ終了時(オブジェクトの破棄時)に自動的にロック解除する RAII パターンを実装したものです。
(六)自動ロック取得(autoLock)
自動ロック機構の実装は以下のようになっています:
template <typename... LockArgs>
void autoLock(LockArgs&&... args) {
static_assert(sizeof...(args) % 4 == 0, "Lock args must be in sets of four: (LockType, object, mode, skip_recur_lock_check)"); // 引数の数は4の倍数でなければならない
auto lock_tuple = std::make_tuple(std::forward<LockArgs>(args)...);
std::map<int, std::unique_ptr<auto_lock_base>> lock_order;
dropLocks(lock_order, lock_tuple, std::make_index_sequence<sizeof...(args) / 4>()); // 既存ロックを解放
acquireLocks(lock_order); // ソートされた順にロック取得
}
ここで dropLocks は、C++17 のパラメーターパック展開機能を活用しています:
template <typename Tuple, std::size_t I>
void dropLockSet(std::map<int, std::unique_ptr<auto_lock_base>>& lock_order, Tuple& lock_tuple, std::integral_constant<std::size_t, I>) {
LockType lock = std::get<4 * I>(lock_tuple);
auto& object = std::get<4 * I + 1>(lock_tuple);
int mode = std::get<4 * I + 2>(lock_tuple);
bool skip_recur_lock_check = std::get<4 * I + 3>(lock_tuple);
if (lock_static->lock_count.count(lock) > 0)
dropLock(lock, object, mode, skip_recur_lock_check); // ロックがすでに存在する場合は解放
lock_order.emplace(static_cast<int>(lock), std::make_unique<auto_lock<std::decay_t<decltype(object)>>>(object, mode, skip_recur_lock_check)); // ソート順に登録
}
template <typename Tuple, std::size_t... I>
void dropLocks(std::map<int, std::unique_ptr<auto_lock_base>>& lock_order, Tuple& lock_tuple, std::index_sequence<I...>) {
(dropLockSet(lock_order, lock_tuple, std::integral_constant<std::size_t, I>{}), ...); // tuple から自動的に展開
}
ロックの取得は acquireLocks により行われます:
void acquireLocks(std::map<int, std::unique_ptr<auto_lock_base>>& lock_order) {
for (const auto& lo : lock_order) {
if (lo.second) {
int lock_class = lock_state[lo.first].lock_class;
auto_lock_base *base_record = lo.second.get();
switch(lock_class) {
case CLIENT:
if (auto record = dynamic_cast<auto_lock<Client>*>(base_record)) {
auto& obj = record->object;
acquireLock(static_cast<LockType>(lo.first), obj, record->mode, record->skip_recur_lock_check);
} else {
ceph_assert("convert to Client failed!" == 0);
}
break;
case FH:
if (auto record = dynamic_cast<auto_lock<Fh>*>(base_record)) {
auto& obj = record->object;
acquireLock(static_cast<LockType>(lo.first), obj, record->mode, record->skip_recur_lock_check);
} else {
ceph_assert("convert to Fh failed!" == 0);
}
break;
// ...(他のクラスも同様に処理)
}
}
}
}
五、重み計算の例
以下の表は、極端なロック取得・解除のシナリオを示したものであり、実際の業務処理を反映するものではありません。この例では、各値の変動や判断ロジックの流れを視覚的に確認することができます。表中に赤字で示されている箇所は、チェックに失敗した処理を意味します。
操作 | weight | last_weight | total_weight | lock_count |
---|---|---|---|---|
acquireLock(CLIENT_LOCK) | 10 | 10 | 10 | [CLIENT_LOCK]=1 |
acquireLock(TRIM_LOCK) | 20 | 20 | 30 | [TRIM_LOCK]=1 |
acquireLock(IM_LOCK) (P) | 40 | 40 | 70 | [IM_LOCK]=1 |
acquireLock(IM_LOCK) (C) | 40 | 40 | 70 | [IM_LOCK]=2 |
acquireLock(DN_LOCK) | 80 | 80 | 150 | [DN_LOCK]=1 |
dropLock(IM_LOCK) (P) | 40 | 80 | 150 | [IM_LOCK]=1 |
acquireLock(IM_LOCK) (P)(逆序) | 40 | 80 | 150 | × |
dropLock(DN_LOCK) | 80 | 40 | 70 | [DN_LOCK]=0,erase |
acquireLock(IM_LOCK) (P)(逆父子) | 40 | 40 | 70 | × |
acquireLock(SNAP_REALMS_LOCK) | 80 | 80 | 150 | [SNAP_REALMS_LOCK]=1 |
acquireLock(SNAPREALM_LOCK)(CS) | 90 | 90 | 240 | [SNAPREALM_LOCK]=1 |
dropLock(SNAP_REALMS_LOCK) | 80 | 90 | 160 | [SNAP_REALMS_LOCK]=0,erase |
dropLock(TRIM_LOCK) | 20 | 90 | 140 | [TRIM_LOCK]=0,erase |
acquireLock(IM_LOCK)(CC)(逆序) | 40 | 90 | 140 | × |
acquireLock(SNAPREALM_LOCK)(PS) | 90 | 90 | 140 | [SNAPREALM_LOCK]=2 |
dropLock(SNAPREALM_LOCK)(PS) | 90 | 90 | 140 | [SNAPREALM_LOCK]=1 |
dropLock(SNAPREALM_LOCK)(CS) | 90 | 40 | 50 | [SNAPREALM_LOCK]=0 |
acquireLock(MDSMAP_LOCK) | 50 | 50 | 100 | [MDSMAP_LOCK]=1 |
acquireLock(ROOT_LOCK) | 50 | 50 | × | × |
dropLock(IM_LOCK) | 40 | 50 | 60 | [IM_LOCK]=0,erase |
dropLock(MDSMAP_LOCK) | 50 | 10 | 10 | [MDSMAP_LOCK]=0,erase |
dropLock(CLIENT_LOCK) | 10 | 0 | 0 | [CLIENT_LOCK]=0,erase |
六、注意点
(1) 特定ケースでの再帰ロックチェックのスキップ
現在の Client 実装では、IM_LOCK および SNAPREALM_LOCK の再帰ロックに関して、使用箇所が明確に限定されているため、一部の処理では 親子関係のチェックをスキップ することができます。
• IM_LOCK の例
o link() や unlink() 関数において、リンク前に Inode 間の親子関係が明確なため、再帰ロックチェックは省略可能。
• SNAPREALM_LOCK の例
o adjust_realm_parent() 関数では、子 Realm から親 Realm を明示的に取得して操作しているため、チェックは不要。
このように 親子関係が事前に確定している場合には、skip_recur_lock_check = true として性能向上を図る ことが可能です。ただし、新規機能や複数 Inode にまたがるロック処理を追加する場合には、チェックの省略は避けてください。
(2) LockManager のスレッドローカル登録
上位層の関数からロック処理を始める場合は、必ず register_tl_static() を呼び出し、終了時に destroy_tl_static() を実行する必要があります。あるいは LockManagerScope を使用することで自動登録も可能です。
これはスレッドごとのロック状態を追跡するための仕組みであり、登録せずにロック処理を行うとアサーション失敗になります。
試験的に一部のロックだけをテストする場合は、has_register_tl_static() を使用して登録済みかどうかを確認できます。
(3) セマフォを使用した待機処理は対象外
セマフォによる待機(例:信号待ちなど)では、実質的にスレッドは ロックを解放した停止状態 にあり、この間に他のロックを取得することはありません。そのため、こうした箇所ではロック状態の記録・監視は不要です。