本稿は、できる限り多くの方、特にRTOSの勉強がてら拝見されている方にはぜひ読んでいただきたいと考えます。本稿で扱っているmutexの機構は、RTOSで非常に重要な役割であり、Zephyrに限らず、どのRTOSでも同様の実装がされているためです。
ちなみにLinuxでも優先度継承機能を備えています。
排他制御とは
皆さんご存知だと思いますが、同一のリソースを複数のスレッドが同時にアクセスして、システムに不整合が発生することを防ぐための機能を排他制御と言います。
競合はスレッド間、スレッドと割込み処理、割込み処理間などが考えられます。
これまでに出てきたirq_lock ( )はスレッドと割込みの競合や割込み間の競合を防ぎ、本稿で説明するmutexはスレッド間の競合を防ぎます。
mutexについては下記3つのシステムコールを用意しています。
void k_mutex_init(struct k_mutex *mutex);
int k_mutex_lock(struct k_mutex *mutex, s32_t timeout);
void k_mutex_unlock(struct k_mutex *mutex);
数も少ないため、早速、これらのコードリーディングをしたいところですが、本機能を語る上で欠かせない事があります。リアルタイム性能に関する話ですが、優先度逆転問題です。この問題とその解決策を述べた上でmutexの実装を見てみたいと思います。
優先度逆転問題
皆さんご存知でしょうか。
1997年にNASA の火星探査機マーズ・パスファインダーが火星でシステム再起動してしまうという重大な問題が発生しました。
Wikipediaに記載されるほど有名な問題です。「リンク:優先順位の逆転」
この問題の原因が、他ならぬ優先度逆転問題でした。
まずは優先度逆転問題とは何かを整理しましょう。
問題事例とあるべき姿を上下に載せました。
高優先度Cが低優先度Aが獲得している資源を待っている間に中優先度Bが実行可能状態となると、AからCPUを奪いBが実行されます。(本図の上段のt4~t5)
この区間が問題です。これによって、高優先度スレッドCがt6まで遅延させられてしまいます。
この事例の場合、t4からt5の区間で高優先度Cと中優先度Bの優先度が逆転してしまっており、これを優先度逆転問題と言います。
一方で、下段のあるべき姿では中優先度Bが実行可能状態になっても高優先度Cが待ち状態であるため、中優先度Bをブロックして低優先度Aのクリティカルセクションの実行を継続します。こうすることでクリティカルセクション終了時のスケジューリングによって高優先度Cをディスパッチでき、遅延を最小限に留めることが可能です。(上段ではt6から実行していた処理を下段ではt5より前に実行できており、t6時点では処理を完了している)
優先度逆転回避アルゴリズム
RTOS (Real Time OS)はその名の通り、リアルタイム応答性能が非常に重要で、前述の問題事例のようなことはあってはならないことです。
では前のあるべき姿の処理順にするためにはどうすれば良いでしょうか。ポイントはmutex獲得中のスレッドAの優先度です。
以下に優先度の逆転を回避する仕組みを2つ紹介します。
優先度継承プロトコル
前図のt3の段階で資源(mutex)のオーナースレッド(獲得しているスレッド)Aの優先度を高優先度Cの優先度に一時的にブーストします。
上図では、スレッドAはtXからtYの区間でブーストします。
そうすると、中優先度Bが実行可能になってもAの方が優先度が高いため、Aのクリティカルセクションの実行を継続できます。そして、Aがmutex解放時に元の低優先度に戻し、スケジューラは高優先度Cをディスパッチします。
優先度上限プロトコル
Aがmutexを獲得した時点で予め設定していた高優先度の値を使用します。通常は、最高優先度のスレッド以上の優先度を設定しておけば期待通りの動作になります。
mutexの実装
Zephyrでは優先度継承プロトコルを用いて優先度逆転問題を回避しています。
それを意識しつつ、コードを見てみましょう。
まずはk_mutexのデータ構造です。
struct k_mutex {
_wait_q_t wait_q; //mutex獲得待ちのためのウェイトキュー
struct k_thread *owner; //獲得しているスレッド(オーナースレッド)
u32_t lock_count; //ロック回数
int owner_orig_prio; //オーナースレッドのオリジナル優先度
_OBJECT_TRACING_NEXT_PTR(k_mutex);
};
mutexの初期化処理
void _impl_k_mutex_init(struct k_mutex *mutex)
{
mutex->owner = NULL;
mutex->lock_count = 0;
/* initialized upon first use */
/* mutex->owner_orig_prio = 0; */
sys_dlist_init(&mutex->wait_q);
SYS_TRACING_OBJ_INIT(k_mutex, mutex);
_k_object_init(mutex);
}
下記の通りk_mutex構造体mutexの各メンバを初期化しています。
owner:NULL
lock_count:0
wait_q:headとtailを自分自身にポイント。
mutexの獲得処理
mutexは獲得処理と開放処理が重要なので丁寧に見ましょう。
int _impl_k_mutex_lock(struct k_mutex *mutex, s32_t timeout)
{
1 int new_prio, key;
2 _sched_lock();
3 if (likely(mutex->lock_count == 0 || mutex->owner == _current)) {
4 RECORD_STATE_CHANGE();
5 mutex->owner_orig_prio = mutex->lock_count == 0 ?
_current->base.prio :
mutex->owner_orig_prio;
6 mutex->lock_count++;
7 mutex->owner = _current;
8 K_DEBUG("%p took mutex %p, count: %d, orig prio: %d\n",
_current, mutex, mutex->lock_count,
mutex->owner_orig_prio);
9 k_sched_unlock();
10 return 0;
11 }
12 RECORD_CONFLICT();
13 if (unlikely(timeout == K_NO_WAIT)) {
14 k_sched_unlock();
15 return -EBUSY;
16 }
17 new_prio = new_prio_for_inheritance(_current->base.prio,
mutex->owner->base.prio);
18 key = irq_lock();
19 K_DEBUG("adjusting prio up on mutex %p\n", mutex);
20 if (_is_prio_higher(new_prio, mutex->owner->base.prio)) {
21 adjust_owner_prio(mutex, new_prio);
22 }
23 _pend_current_thread(&mutex->wait_q, timeout);
24 int got_mutex = _Swap(key);
25 K_DEBUG("on mutex %p got_mutex value: %d\n", mutex, got_mutex);
26 K_DEBUG("%p got mutex %p (y/n): %c\n", _current, mutex,
got_mutex ? 'y' : 'n');
27 if (got_mutex == 0) {
28 k_sched_unlock();
29 return 0;
30 }
/* timed out */
31 K_DEBUG("%p timeout on mutex %p\n", _current, mutex);
32 struct k_thread *waiter =
(struct k_thread *)sys_dlist_peek_head(&mutex->wait_q);
33 new_prio = mutex->owner_orig_prio;
34 new_prio = waiter ? new_prio_for_inheritance(waiter->base.prio, new_prio) : new_prio;
35 K_DEBUG("adjusting prio down on mutex %p\n", mutex);
36 key = irq_lock();
37 adjust_owner_prio(mutex, new_prio);
38 irq_unlock(key);
39 k_sched_unlock();
40 return -EAGAIN;
}
2行目:ロック処理実施中に他スレッドを動作させないようにスケジューラのロックを獲得します。
3-11行目:まだ誰もmutexを獲得していない、またはオーナーが自分自身のときの処理です。この場合、mutexを獲得可能です。
5行目:owner_orig_prioを設定します。
まだ自分自身でmutexを獲得していない場合は獲得を試みたスレッドの優先度を設定、自分自身がすでにmutexを獲得している場合はowner_orig_prioを設定しています。
6行目:ロックカウントをインクリメントします。
7行目:ownerに_currentを設定します。
9-10行目;スケジューラのロックを開放し、本関数から復帰します。mutexを獲得していない、またはオーナーが自分自身の時はこれで獲得処理を完了します。
12行目以降が別のスレッドがすでに対象のmutexを獲得していた時の処理になります。
13-16行目:引数timeoutにK_NO_WAIT(待たないオプション)が指定されていた場合、スケジューラのロックを開放して即座に復帰します。なお、復帰値はリソースが使用されていたことを示すEBUSYになります。
17行目:優先度を継承する必要がある場合は継承した優先度をnew_prioに設定します。
ここが重要なポイントですので、new_prio_for_inheritance ( )のコードを以下に示します。
static int new_prio_for_inheritance(int target, int limit)
{
int new_prio = _is_prio_higher(target, limit) ? target : limit;
new_prio = _get_new_prio_with_ceiling(new_prio);
return new_prio;
}
targetが現在実行中のスレッド優先度、limitがmutexを獲得しているオーナースレッドの優先度です。_is_prio_higher ( )で現在実行中のスレッドの優先度の方が高い場合、現在実行中のスレッドの優先度をnew_prioに設定します。そうでない場合は、オーナースレッドの優先度を設定します。
そして_get_new_prio_with_ceiling ( )でコンフィグCONFIG_PRIORITY_CEILINGより低優先度の場合はnew_prioはそのまま、CONFIG_PRIORITY_CEILINGより高優先度の場合はCONFIG_PRIORITY_CEILINGをnew_prioに設定します。CONFIG_PRIORITY_CEILINGは上限値の役割をしています。(CONFIG_PRIORITY_CEILINGのデフォルト値は0)
_impl_k_mutex_lock ( )に話を戻します。
18行目:割込みを禁止します。
20-22行目:new_prioがmutexを保持しているオーナースレッドの優先度より高い場合、オーナースレッドの優先度を一時的にnew_prioにブーストします。
23行目:mutex獲得を待つために、現在のスレッドをreadyキューから外し、指定したタイムアウト値でウェイトキューに繋ぎます。
24行目:コンテキストスイッチします。これ以降の処理はオーナースレッドがクリティカルセクション実行し、mutexを解放した後、もしくは指定したタイムアウトを超過した場合に復帰します。
27-30行目:復帰値が0の時はスケジューラをアンロックして本関数から復帰します。
この処理を不思議に思う方もいると思います。新たなオーナースレッドの獲得処理をしていないのでは?と。当該処理は前のオーナースレッドがmutexを解放する処理で実施しているため、この処理で十分です。
31行目以降はタイムアウトした時の処理になります。基本的な処理は、対象mutexを待っている次のスレッドの優先度にブーストし直すことです。
32行目:対象のmutexを待っている次のスレッドをwaiterに設定します。
33行目:new_prioにmutexのowner_orig_prio(元の優先度)を設定します。
34行目:waiterが存在すれば、17行目と同じくnew_prio_for_inheritance ( )を用いて必要に応じてnew_prioを更新します。
36-38行目:割込みを禁止後、21行目同様にnew_prioがmutexを保持しているオーナーの優先度より高い場合、オーナーの優先度を一時的にnew_prioにブーストします。その後割込み状態を復元します。
39行目:スケジューラをアンロックします。
40行目:タイムアウト時の復帰値-EAGAINを返します。
24行目でオーナースレッドにスイッチすると、オーナースレッドのクリティカルセクション実行後のmutex解放処理と、次に対象mutexを獲得するスレッドの獲得処理を行うため、24行目以降はこの部分だけを見ても理解できない点もあると思います。
次にmutexの解放処理を見てみましょう。
mutexの解放処理
void _impl_k_mutex_unlock(struct k_mutex *mutex)
{
int key;
__ASSERT(mutex->lock_count > 0, "");
__ASSERT(mutex->owner == _current, "");
1 _sched_lock();
2 RECORD_STATE_CHANGE();
3 mutex->lock_count--;
4 K_DEBUG("mutex %p lock_count: %d\n", mutex, mutex->lock_count);
5 if (mutex->lock_count != 0) {
6 k_sched_unlock();
7 return;
8 }
9 key = irq_lock();
10 adjust_owner_prio(mutex, mutex->owner_orig_prio);
11 struct k_thread *new_owner = _unpend_first_thread(&mutex->wait_q);
12 K_DEBUG("new owner of mutex %p: %p (prio: %d)\n",
mutex, new_owner, new_owner ? new_owner->base.prio : -1000);
13 if (new_owner) {
14 _abort_thread_timeout(new_owner);
15 _ready_thread(new_owner);
16 irq_unlock(key);
17 _set_thread_return_value(new_owner, 0);
/*
* new owner is already of higher or equal prio than first
* waiter since the wait queue is priority-based: no need to
* ajust its priority
*/
18 mutex->owner = new_owner;
19 mutex->lock_count++;
20 mutex->owner_orig_prio = new_owner->base.prio;
21 } else {
22 irq_unlock(key);
23 mutex->owner = NULL;
24 }
25 k_sched_unlock();
}
1行目:スケジューラをロックします。
3行目:mutexのロック回数をデクリメントします。
5-8行目:ロックカウントが0でなければスケジューラロックを解放後、0で復帰します。自身が対象mutexを多重で獲得していた時はロックカウントが0にならないので本処理を実行します。
9行目:割込み禁止にします。
10行目:adjust_owner_prio ( )でmutexのオーナースレッドの優先度を元の優先度に戻します。
11行目:次のオーナースレッドnew_ownerを_unpend_first_thread ( )、ウェイトキューを用いて設定します。
13-21行目:オーナースレッドが存在する場合です。
14行目:タイムアウト付きでwaitキューで休眠していたため、new_ownerをキューから削除します。
15行目:new_ownerをreadyキューに繋ぎます。
18-20行目:mutexのowner、lock_count、owner_orig_prioをnew_owner用で設定します。(獲得処理と同等の処理を行います)
21-24行目:オーナースレッドが存在しない場合、割込み状態を復元し、mutexのオーナーにNULLを設定します。
25行目:スケジューラをアンロックします。
いかがでしたか。
優先度継承はリアルタイム応答性能に大きく影響するため、RTOSを語る上では欠かせない重要な機能です。
説明が抜けている、理解できない、などありましたらコメントください。
では、また。
前回:Zephyr入門(スケジューラ:ワークキュー編)
次回:Zephyr入門(排他制御:その他)
『各種製品名は、各社の製品名称、商標または登録商標です。本記事に記載されているシステム名、製品名には、必ずしも商標表示((R)、TM)を付記していません。』