#はじめに
いまさらですが、Linuxの排他制御機構Read-Copy-Update (RCU)のコードを読んでみたいと思います。コードベースはLinux 4.3です。
LinuxのRCUのコードを読んでみる (rcu_read_{lock,unlock}編)の続きです。
RCUのsynchronize_rcu
(およびcall_rcu
)の実装には、通常版(CONFIG_TREE_RCU
)と簡易版(CONFIG_TINY_CPU
)があります。通常版はわりと複雑なので、まずはユニプロセッサ向けの簡易版を読んで雰囲気を掴みます。
#synchronize_rcu
synchronize_rcu
はRCUのキモとも呼べる機能を提供します。すなわち、すべてのreaderがクリティカルセクションを抜けるまで待ち合わせる機能です。
CONFIG_PREEMPT_RCU
が有効になっていないカーネルでは、synchronize_rcu
はsynchronize_sched
を呼ぶだけです。
##synchronize_sched
簡易版の場合は、kernel/rcu/tiny.c
のsynchronize_sched
になります。
void synchronize_sched(void)
{
RCU_LOCKDEP_WARN(lock_is_held(&rcu_bh_lock_map) ||
lock_is_held(&rcu_lock_map) ||
lock_is_held(&rcu_sched_lock_map),
"Illegal synchronize_sched() in RCU read-side critical section");
cond_resched();
}
RCU_LOCKDEP_WARN
はread側のクリティカルセクションで呼ばれていないかチェックしているだけです。
#define cond_resched() ({ \
___might_sleep(__FILE__, __LINE__, 0); \
_cond_resched(); \
})
cond_resched
は___might_sleep
して、_cond_resched
するだけです。
___might_sleep
はCONFIG_DEBUG_ATOMIC_SLEEP
が有効のときは様々なassertionを実行したり、デバッグ出力したりする関数です。私の環境では無効なので、深追いはしません。
int __sched _cond_resched(void)
{
if (should_resched(0)) {
preempt_schedule_common();
return 1;
}
return 0;
}
_cond_resched
は、should_resched
がtrueであれば、preempt_schedule_common
を呼びます。
static __always_inline bool should_resched(int preempt_offset)
{
return unlikely(raw_cpu_read_4(__preempt_count) == preempt_offset);
}
x86の場合は、should_resched
は(引数が0なので)__preempt_count
が0の場合にのみtrueになります。__preempt_count
はCONFIG_PREEMPT_COUNT
が有効の場合のみ0になり得ます。そしてCONFIG_PREEMPT_COUNT
はCONFIG_PREEMPT
が有効の場合のみ有効になるので、今回は0になることはありません。つまり、should_resched
がtrueになることはありません。preempt_countについてもっと知りたい場合は参考文献を参照してください。
結局、synchronize_rcu
が何をしているかというと何もしていません。えーって感じですが、ユニプロセッサでかつ横取りがない環境では、synchronize_sched
がスケジューリングされた段階で、readerはクリティカルセクションを抜けていることは保証されているというわけですね。
じゃあsynchronize_rcu
は空の関数でも良いの?という疑問への回答は、RCU: The Bloatwatch Edition [LWN.net]にあるので気になる方は読んでみてください。
#call_rcu
RCUのwriter側には、synchronize_rcu
の他にcall_rcu
というreaderがクリティカルセクションを抜けたら、登録したコールバックを呼んでくれるAPIも存在します。
なお後述しますが、コールバック関数はソフトIRQで実行されます。
##データ構造
まずコールバック関数のためデータ構造を読んでいきます。
コールバック関数リストはRCU: The Bloatwatch Edition [LWN.net]に図があるので、それを見るとわかりやすいです。rcu_ctrlblk
という構造体に、コールバック関数リストであるrcucblist
があります。この他に、すでに待ち合わせ期間1を経過したコールバック関数の最後の要素の次の要素を指すdonetail
、リストの最後の要素の次の要素を指すcurtail
があります。
例えば、donetail
とcurtail
が一致していなければ、コールバック待ちの関数が存在することがわかります(rcu_qsctr_help
関数参照)。また、donetail
がrcucblist
の先頭の要素と一致していれば、呼び出すべきコールバック関数がないことがわかります(__rcu_process_callbacks
関数参照)。
##動作
CONFIG_PREEMPT
が有効でない場合は、call_rcu
はcall_rcu_sched
です。
void call_rcu_sched(struct rcu_head *head, void (*func)(struct rcu_head *rcu))
{
__call_rcu(head, func, &rcu_sched_ctrlblk);
}
static void __call_rcu(struct rcu_head *head,
void (*func)(struct rcu_head *rcu),
struct rcu_ctrlblk *rcp)
{
unsigned long flags;
debug_rcu_head_queue(head);
head->func = func;
head->next = NULL;
local_irq_save(flags);
*rcp->curtail = head;
rcp->curtail = &head->next;
RCU_TRACE(rcp->qlen++);
local_irq_restore(flags);
if (unlikely(is_idle_task(current))) {
/* force scheduling for rcu_sched_qs() */
resched_cpu(0);
}
}
この関数は、コールバック関数リストに指定コールバック関数を追加して、実行中のタスクがアイドルタスクの場合、ランキューにあるタスクをスケジュールします。コメントにあるrcu_sched_qs
は、いわゆるスケジューラのコア関数である__schedule
からrcu_note_context_switch
経由で呼ばれます2。
rcu_sched_qs
は、以下のように、コールバックすべき関数があれば(rcu_qsctr_help
が1以上を返せば3)、RCUのソフトIRQを実行するように設定します。
void rcu_sched_qs(void)
{
unsigned long flags;
local_irq_save(flags);
if (rcu_qsctr_help(&rcu_sched_ctrlblk) +
rcu_qsctr_help(&rcu_bh_ctrlblk))
raise_softirq(RCU_SOFTIRQ);
local_irq_restore(flags);
}
rcu_qsctr_help
がどうなっているかというと、前述の通り、donetail
とcurtail
を比較しているだけです。
static int rcu_qsctr_help(struct rcu_ctrlblk *rcp)
{
RCU_TRACE(reset_cpu_stall_ticks(rcp));
if (rcp->donetail != rcp->curtail) {
rcp->donetail = rcp->curtail;
return 1;
}
return 0;
}
ここで見落としてはいけないのは、donetail = curtail
している点です。これは、リストに存在するコールバック関数をすべてコールバックするように設定しているという意味です。つまり、rcu_sched_qs
に来た段階ですべてのコールバック関数は待ち合わせ期間を経過したとみなしています。これはsynchronize_rcu
と同じく、スケジューラが起動した=クリティカルセクションを抜けた、見なせるからです。
RCUのソフトIRQには、rcu_process_callbacks
関数が設定されているので、それを読んでいきます。
##rcu_process_callbacks
処理の本体は__rcu_process_callbacks
にあります。
static void __rcu_process_callbacks(struct rcu_ctrlblk *rcp)
{
const char *rn = NULL;
struct rcu_head *next, *list;
unsigned long flags;
RCU_TRACE(int cb_count = 0);
/* Move the ready-to-invoke callbacks to a local list. */
local_irq_save(flags);
if (rcp->donetail == &rcp->rcucblist) {
/* No callbacks ready, so just leave. */
local_irq_restore(flags);
return;
}
RCU_TRACE(trace_rcu_batch_start(rcp->name, 0, rcp->qlen, -1));
list = rcp->rcucblist;
rcp->rcucblist = *rcp->donetail;
*rcp->donetail = NULL;
if (rcp->curtail == rcp->donetail)
rcp->curtail = &rcp->rcucblist;
rcp->donetail = &rcp->rcucblist;
local_irq_restore(flags);
/* Invoke the callbacks on the local list. */
RCU_TRACE(rn = rcp->name);
while (list) {
next = list->next;
prefetch(next);
debug_rcu_head_unqueue(list);
local_bh_disable();
__rcu_reclaim(rn, list);
local_bh_enable();
list = next;
RCU_TRACE(cb_count++);
}
RCU_TRACE(rcu_trace_sub_qlen(rcp, cb_count));
RCU_TRACE(trace_rcu_batch_end(rcp->name,
cb_count, 0, need_resched(),
is_idle_task(current),
false));
}
少し長いですが、やってることは、rcucblist
にあるコールバック関数のうち、donetail
までのものをローカル変数のリストに移して、1つずつ実行しているだけです。分かりにくいですが関数の実行は__rcu_reclaim
の中でやっています。
##rcu_sched_qsの呼ばれる場所
先ほどは、__call_rcu
からすぐにrcu_sched_qs
が呼ばれる特殊なケースを読みましたが、すぐに呼ばれない場合はどこから呼ばれるのでしょうか?答えはtick(タイマ割り込み)です。
アーキテクチャによって違うようですが、x86だとtick_periodic
/tick_sched_handle
→update_process_times
→rcu_check_callbacks
→rcu_sched_qs
という流れで呼ばれるようです。
#終わりに
RCUのsynchronize_rcu(およびcall_rcu)
の実装の簡易版を読んでみました。RCUは、その性質上、タスクスケジューリングに依存している部分があるので、前提知識がいろいろ必要で大変でした。
次回はより複雑な通常版のsynchronize_rcu
とcall_rcu
を読んでみたいです。
#参考文献
-
RCU: The Bloatwatch Edition [LWN.net]
- ちょっと実装が古いので参考になるようなならないような感じです。
-
プリエンプション - Linuxカーネルメモ
- preempt_countについて詳しい説明がある。