はじめに
いまさらですが、Linuxの排他制御機構Read-Copy-Update (RCU)のコードを読んでみたいと思います。コードベースはLinux 4.3です。
LinuxのRCUのコードを読んでみる (rcu_read_{lock,unlock}編)の続きです。
RCUのsynchronize_rcu(およびcall_rcu)の実装には、通常版(CONFIG_TREE_RCU)と簡易版(CONFIG_TINY_CPU)があります。通常版はわりと複雑なので、まずはユニプロセッサ向けの簡易版を読んで雰囲気を掴みます。
synchronize_rcu
synchronize_rcuはRCUのキモとも呼べる機能を提供します。すなわち、すべてのreaderがクリティカルセクションを抜けるまで待ち合わせる機能です。
CONFIG_PREEMPT_RCUが有効になっていないカーネルでは、synchronize_rcuはsynchronize_schedを呼ぶだけです。
synchronize_sched
簡易版の場合は、kernel/rcu/tiny.cのsynchronize_schedになります。
void synchronize_sched(void)
{
RCU_LOCKDEP_WARN(lock_is_held(&rcu_bh_lock_map) ||
lock_is_held(&rcu_lock_map) ||
lock_is_held(&rcu_sched_lock_map),
"Illegal synchronize_sched() in RCU read-side critical section");
cond_resched();
}
RCU_LOCKDEP_WARNはread側のクリティカルセクションで呼ばれていないかチェックしているだけです。
# define cond_resched() ({ \
___might_sleep(__FILE__, __LINE__, 0); \
_cond_resched(); \
})
cond_reschedは___might_sleepして、_cond_reschedするだけです。
___might_sleepはCONFIG_DEBUG_ATOMIC_SLEEPが有効のときは様々なassertionを実行したり、デバッグ出力したりする関数です。私の環境では無効なので、深追いはしません。
int __sched _cond_resched(void)
{
if (should_resched(0)) {
preempt_schedule_common();
return 1;
}
return 0;
}
_cond_reschedは、should_reschedがtrueであれば、preempt_schedule_commonを呼びます。
static __always_inline bool should_resched(int preempt_offset)
{
return unlikely(raw_cpu_read_4(__preempt_count) == preempt_offset);
}
x86の場合は、should_reschedは(引数が0なので)__preempt_countが0の場合にのみtrueになります。__preempt_countはCONFIG_PREEMPT_COUNTが有効の場合のみ0になり得ます。そしてCONFIG_PREEMPT_COUNTはCONFIG_PREEMPTが有効の場合のみ有効になるので、今回は0になることはありません。つまり、should_reschedがtrueになることはありません。preempt_countについてもっと知りたい場合は参考文献を参照してください。
結局、synchronize_rcuが何をしているかというと何もしていません。えーって感じですが、ユニプロセッサでかつ横取りがない環境では、synchronize_schedがスケジューリングされた段階で、readerはクリティカルセクションを抜けていることは保証されているというわけですね。
じゃあsynchronize_rcuは空の関数でも良いの?という疑問への回答は、RCU: The Bloatwatch Edition [LWN.net]にあるので気になる方は読んでみてください。
call_rcu
RCUのwriter側には、synchronize_rcuの他にcall_rcuというreaderがクリティカルセクションを抜けたら、登録したコールバックを呼んでくれるAPIも存在します。
なお後述しますが、コールバック関数はソフトIRQで実行されます。
データ構造
まずコールバック関数のためデータ構造を読んでいきます。
コールバック関数リストはRCU: The Bloatwatch Edition [LWN.net]に図があるので、それを見るとわかりやすいです。rcu_ctrlblkという構造体に、コールバック関数リストであるrcucblistがあります。この他に、すでに待ち合わせ期間1を経過したコールバック関数の最後の要素の次の要素を指すdonetail、リストの最後の要素の次の要素を指すcurtailがあります。
例えば、donetailとcurtailが一致していなければ、コールバック待ちの関数が存在することがわかります(rcu_qsctr_help関数参照)。また、donetailがrcucblistの先頭の要素と一致していれば、呼び出すべきコールバック関数がないことがわかります(__rcu_process_callbacks関数参照)。
動作
CONFIG_PREEMPTが有効でない場合は、call_rcuはcall_rcu_schedです。
void call_rcu_sched(struct rcu_head *head, void (*func)(struct rcu_head *rcu))
{
__call_rcu(head, func, &rcu_sched_ctrlblk);
}
static void __call_rcu(struct rcu_head *head,
void (*func)(struct rcu_head *rcu),
struct rcu_ctrlblk *rcp)
{
unsigned long flags;
debug_rcu_head_queue(head);
head->func = func;
head->next = NULL;
local_irq_save(flags);
*rcp->curtail = head;
rcp->curtail = &head->next;
RCU_TRACE(rcp->qlen++);
local_irq_restore(flags);
if (unlikely(is_idle_task(current))) {
/* force scheduling for rcu_sched_qs() */
resched_cpu(0);
}
}
この関数は、コールバック関数リストに指定コールバック関数を追加して、実行中のタスクがアイドルタスクの場合、ランキューにあるタスクをスケジュールします。コメントにあるrcu_sched_qsは、いわゆるスケジューラのコア関数である__scheduleからrcu_note_context_switch経由で呼ばれます2。
rcu_sched_qsは、以下のように、コールバックすべき関数があれば(rcu_qsctr_helpが1以上を返せば3)、RCUのソフトIRQを実行するように設定します。
void rcu_sched_qs(void)
{
unsigned long flags;
local_irq_save(flags);
if (rcu_qsctr_help(&rcu_sched_ctrlblk) +
rcu_qsctr_help(&rcu_bh_ctrlblk))
raise_softirq(RCU_SOFTIRQ);
local_irq_restore(flags);
}
rcu_qsctr_helpがどうなっているかというと、前述の通り、donetailとcurtailを比較しているだけです。
static int rcu_qsctr_help(struct rcu_ctrlblk *rcp)
{
RCU_TRACE(reset_cpu_stall_ticks(rcp));
if (rcp->donetail != rcp->curtail) {
rcp->donetail = rcp->curtail;
return 1;
}
return 0;
}
ここで見落としてはいけないのは、donetail = curtailしている点です。これは、リストに存在するコールバック関数をすべてコールバックするように設定しているという意味です。つまり、rcu_sched_qsに来た段階ですべてのコールバック関数は待ち合わせ期間を経過したとみなしています。これはsynchronize_rcuと同じく、スケジューラが起動した=クリティカルセクションを抜けた、見なせるからです。
RCUのソフトIRQには、rcu_process_callbacks関数が設定されているので、それを読んでいきます。
rcu_process_callbacks
処理の本体は__rcu_process_callbacksにあります。
static void __rcu_process_callbacks(struct rcu_ctrlblk *rcp)
{
const char *rn = NULL;
struct rcu_head *next, *list;
unsigned long flags;
RCU_TRACE(int cb_count = 0);
/* Move the ready-to-invoke callbacks to a local list. */
local_irq_save(flags);
if (rcp->donetail == &rcp->rcucblist) {
/* No callbacks ready, so just leave. */
local_irq_restore(flags);
return;
}
RCU_TRACE(trace_rcu_batch_start(rcp->name, 0, rcp->qlen, -1));
list = rcp->rcucblist;
rcp->rcucblist = *rcp->donetail;
*rcp->donetail = NULL;
if (rcp->curtail == rcp->donetail)
rcp->curtail = &rcp->rcucblist;
rcp->donetail = &rcp->rcucblist;
local_irq_restore(flags);
/* Invoke the callbacks on the local list. */
RCU_TRACE(rn = rcp->name);
while (list) {
next = list->next;
prefetch(next);
debug_rcu_head_unqueue(list);
local_bh_disable();
__rcu_reclaim(rn, list);
local_bh_enable();
list = next;
RCU_TRACE(cb_count++);
}
RCU_TRACE(rcu_trace_sub_qlen(rcp, cb_count));
RCU_TRACE(trace_rcu_batch_end(rcp->name,
cb_count, 0, need_resched(),
is_idle_task(current),
false));
}
少し長いですが、やってることは、rcucblistにあるコールバック関数のうち、donetailまでのものをローカル変数のリストに移して、1つずつ実行しているだけです。分かりにくいですが関数の実行は__rcu_reclaimの中でやっています。
rcu_sched_qsの呼ばれる場所
先ほどは、__call_rcuからすぐにrcu_sched_qsが呼ばれる特殊なケースを読みましたが、すぐに呼ばれない場合はどこから呼ばれるのでしょうか?答えはtick(タイマ割り込み)です。
アーキテクチャによって違うようですが、x86だとtick_periodic/tick_sched_handle→update_process_times→rcu_check_callbacks→rcu_sched_qsという流れで呼ばれるようです。
終わりに
RCUのsynchronize_rcu(およびcall_rcu)の実装の簡易版を読んでみました。RCUは、その性質上、タスクスケジューリングに依存している部分があるので、前提知識がいろいろ必要で大変でした。
次回はより複雑な通常版のsynchronize_rcuとcall_rcuを読んでみたいです。
参考文献
-
RCU: The Bloatwatch Edition [LWN.net]
- ちょっと実装が古いので参考になるようなならないような感じです。
-
プリエンプション - Linuxカーネルメモ
- preempt_countについて詳しい説明がある。