9
9

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

OSv : スケジューラの実装を覗いてみる

Posted at

ご挨拶

どうもです。うっかりAdvent Calendarに参加したakachochinです。
最近は、お仕事が結構アレなのと、早めに解放された場合のみに行ってしまうわけでして、投稿滞っております。

前置きはともかくとして、やはりOSを目の前にしてソース読まないといかんでしょ?ということでスケジューラを読んでみました。

※引用したソース内の日本語コメントは全て私が追記したものです。

スケジューラのソースを覗く

まず、目に付いたのが、idle threadを生成するためのメソッドです。
(OSvはC++で書かれていることを忘れちゃだめですよ)

core/sched.cc
void cpu::init_idle_thread()
{
     // idle threadを生成した時点にrunning_sinceを設定
    running_since = osv::clock::uptime::now();
    std::string name = osv::sprintf("idle%d", id);
     // idleの実体を作っています。threadクラスのインスタンスを
     // 生成することでスレッドができるようです。
     // 今はスレッドコンテキストを覗きみるのは割愛します。
    idle_thread = new thread([this] { idle(); }, thread::attr().pin(this).name(name));
    idle_thread->set_priority(thread::priority_idle);
}

さて、もう少し読み進めると、いよいよそれっぽいのが出てきます。

core/sched.cc
// Note that this is a static (class) function, which can only reschedule
// on the current CPU, not on an arbitrary CPU. Allowing to run one CPU's
// scheduler on a different CPU would be disastrous.
void cpu::schedule()
{
    // コメントの通り。自コアのスレッドのみスケジューリングできるようだ。
    WITH_LOCK(irq_lock) {
        current()->reschedule_from_interrupt();
    }
}

コメントにあるとおり、スケジューリングはあくまでも自コアのみを対象にしています。
他のコアまでをスケジューリング対象にするのは、確かに面倒ですし、やりたくないです。
また、やったとしても排他その他諸々で実装が非常にヤバくなるのが予想されます。

さて、先に進み、御本尊のreschedule_from_interrupt()をみます。

core/sched.cc
void cpu::reschedule_from_interrupt(bool called_from_yield,
                                    thread_runtime::duration preempt_after)
{
    // これは統計情報 or デバッグ向けのトレース機能
    trace_sched_sched();
    // 例外の中からスケジューリングされるのを防ぎたいのが意図だろう
    assert(sched::exception_depth <= 1);
    // もうここに来たからにはスケジューリングされるということ
    need_reschedule = false;
    // これは何だろう?気になる...
    handle_incoming_wakeups();

    // 現在時刻を求めて...
    auto now = osv::clock::uptime::now();
    // 前回スケジューリング時刻からの期間を求める
    auto interval = now - running_since;
    running_since = now;
    // え?!intervalが負になる時があるの?
    // よく読むと、ブート時の”特例的な”処理らしい。
    if (interval <= 0) {
        // During startup, the clock may be stuck and we get zero intervals.
        // To avoid scheduler loops, let's make it non-zero.
        // Also ignore backward jumps in the clock.
        interval = context_switch_penalty;
    }
    // 現在動作中のスレッドを取り出す
    thread* p = thread::current();

    // _detached_stateってなに?
    const auto p_status = p->_detached_state->st.load();
    // queuedってなんだろ?スレッドの状態遷移はどういうのがあるんだろう。
    assert(p_status != thread::status::queued);

    // 通算動作時間を追加
    p->_total_cpu_time += interval;
    // これ、何?
    p->_runtime.ran_for(interval);

threadの状態について

スレッドの状態は_detached_steteに記録されています。
_detached_steteを調べるためにthreadのコンストラクタ見ると、こいつは_detached_stateクラスのインスタンスだとわかります。

core/sched.cc
thread::thread(std::function<void ()> func, attr attr, bool main, bool app)
    : _func(func)
    , _runtime(thread::priority_default)
    , _detached_state(new detached_state(this))
    , _attr(attr)
    , _migration_lock_counter(0)
    , _id(0)
    , _cleanup([this] { delete this; })
    , _app(app)
    , _joiner(nullptr)

さて、_detached_stateはinclude/osv/sched.hhにあります。
このファイルの中のコメントには、スレッドのステートと状態遷移に関する情報が書かれています。
コメントそのままだと見辛いので表にしました。

State machine transition matrix

Initial Next Async? Event Notes
unstarted waiting sync start() followed by wake()
unstarted prestarted sync start() before scheduler startup
prestarted unstarted sync scheduler startup followed by start()
waiting waking async wake()
waiting running sync wait_until cancelled (predicate became true before context switch)
waiting sending_lock async wake_lock() used for ensuring the thread does not wake up while we call receive_lock()
sending_lock waking async mutex::unlock()
running waiting sync prepare_wait()
running queued sync context switch
running terminating sync destroy() thread function completion
queued running sync context switch
waking queued async scheduler poll of incoming thread wakeup queue
waking running sync thread pulls self out of incoming wakeup queue
terminating terminated async post context switch

この表から判断するに、assertで見ているqueuedは「実行可能状態」です。今回のソースリーディングでは多く目にすることになります。

さて、続きを読むことにします。

core/sched.cc
    // current threadが実行状態だった場合の処理
    // たいていの場合はこっちだろう。
    if (p_status == thread::status::running) {
        // The current thread is still runnable. Check if it still has the
        // lowest runtime, and update the timer until the next thread's turn.
         // 実行可能状態のスレッドがいない
         // つまり、次に動けるスレッドがいないということである
        if (runqueue.empty()) {
         // preemption_timerの実体は見ていないが、たぶん、定周期な
         // スケジューリングを目的とするタイマと思われる
         // もちろん、今動いているスレッドが引き続き動くことになると思われる
            preemption_timer.cancel();
            return;
         // 実行可能状態のスレッドはいるけど、このメソッドがyieldから
         // 呼ばれていない場合(CPU実行権を自発的に手放していない)だと思われる。
        } else if (!called_from_yield) {
         // 実行可能キューの先頭からスレッドを引っ張ってくる
            auto &t = *runqueue.begin();
         // get_local()は何?
            if (p->_runtime.get_local() < t._runtime.get_local()) {
                preemption_timer.cancel();
         // このruntimeって何だろう?
         // 次のif文見る限り、deltaには「次のpreemption_timerがexpireする
         // まで」、つまり次にスケジューリングするまでの時間が格納されるのだろう
         // それまでの間、引き続き現スレッドが実行状態となる。
                auto delta = p->_runtime.time_until(t._runtime.get_local());
                if (delta > 0) {
                    preemption_timer.set(now + delta);
                }
                return;
            }
        }
        // If we're here, p no longer has the lowest runtime. Before queuing
        // p, return the runtime it borrowed for hysteresis.
        // ここに来るということは、スレッド切り替えをするということ。
        // その基準はruntime。
        p->_runtime.hysteresis_run_stop();
        p->_detached_state->st.store(thread::status::queued);

        if (!called_from_yield) {
            enqueue(*p);
        }

        trace_sched_preempt();
        p->stat_preemptions.incr();
    } else {
        // p is no longer running, so we'll switch to a different thread.
        // Return the runtime p borrowed for hysteresis.
        p->_runtime.hysteresis_run_stop();
    }

ここまで見て来た処理で、スレッドスイッチするケースかどうかを決めるようです。
そして、どうやらその基準はget_local()というメソッドの戻り値にあるようです。
get_local()を確認しましょう。

スレッドスケジューリングアルゴリズム

get_local()メソッド自体は、単に_Rttというメンバ変数を返すだけです。

それよりも重要なのは、メソッドの上に書かれたコメントです。このコメントによると、戻り値として返しているのはスレッドのruntime(実行時間)ということです。
重要なのは、「lowest runtime will be run first」というところです。

include/osv/sched.hh
    // Get the thread's CPU-local runtime, a number used to sort the runqueue
    // on this CPU (lowest runtime will be run first). local runtime cannot be
    // compared between different different CPUs - see export_runtime().
    inline runtime_t get_local() const
    {
        return _Rtt;
    }

さらに、同ヘッダ内のコメントを読んでみます。

include/osv/sched.hh
// thread_runtime is used to maintain the scheduler's view of the thread's
// priority relative to other threads. It knows about a static priority of the
// thread (allowing a certain thread to get more runtime than another threads)
// and is used to maintain the "runtime" of each thread, a number which the
// scheduler uses to decide which thread to run next, and for how long.
// All methods of this class should be called only from within the scheduler.
// https://docs.google.com/document/d/1W7KCxOxP-1Fy5EyF2lbJGE2WuKmu5v0suYqoHas1jRM

かなり意訳すると以下のとおりでしょう。

スケジューラが他のスレッドに対する優先度をいつでも見られるように、thread_runtimeは使われます。thread_runtimeはスレッドの静的な優先度を示し(あるスレッドが他のスレッドよりも多くの実行時間を獲得できるようにする)、各スレッドの実行時間を維持管理するために使われ、そしてスケジューラが次にどのスレッドをどれだけの期間実行させるかの基準となります。
スケジューラ以外のコードから、このクラスのいかなるメソッドも呼び出すべきではない。

ここで、リンク先にあるgoogleのドキュメントを読みます。
概要をまとめると......(全部読みきれていないのは内緒だぞ)以下の通りです。

  • 直近の(一定の期間)における実行時間が最も少ない実行可能なスレッドをスケジューラは選び実行させる。(論文やコードでは、実行時間を「R」という文字で示している)
  • けれど、この原則を厳密に守り過ぎると過剰なコンテキストスイッチを招くため、hysteresisという概念がある。これはほぼ同じ実行時間を持つスレッドが複数いた場合に使うもので、実行権を獲得したスレッドがスイッチングなく実行される一定の期間のことである。
  • 厳密な実行時間の求め方を全てのスレッドに適用するのはコスト面から現実的でないため、簡略化したアルゴリズムを適用している。
  • ran_forやtime_untilというメソッドがある。これは簡略化したアルゴリズムに従い、直近の実行時間を計算するために必要なパラメータを求めるためのものである。

つまり、これはLinuxのCFSにかなり類似していることがわかります。
これにより、先に紹介した以下のコードは「次に実行可能なスレッドと現在のスレッドの実行時間を比較して、現在のスレッドの実行時間がまだ少ない場合は、次に実行可能なスレッドの実行時間に達するまで動作させる」ということを表すことがわかります。

core/sched.cc
            if (p->_runtime.get_local() < t._runtime.get_local()) {
                preemption_timer.cancel();
                auto delta = p->_runtime.time_until(t._runtime.get_local());
                if (delta > 0) {
                    preemption_timer.set(now + delta);
                }
                return;
            }

続き

最後まで一気に読みます。

core/sched.cc
    // スイッチするスレッドをキューから取り除く
    auto ni = runqueue.begin();
    auto n = &*ni;
    runqueue.erase(ni);

    // これは不明だが、CPUtimeの計算?要確認。
    n->cputime_estimator_set(now, n->_total_cpu_time);
    assert(n->_detached_state->st.load() == thread::status::queued);
    trace_sched_switch(n, p->_runtime.get_local(), n->_runtime.get_local());

    // yield経由で自発的にスイッチした場合、enqueueでキューに入れる。
    // 単に実行権を手放しただけで、いまだ実行可能状態。
    if (called_from_yield) {
        enqueue(*p);
    }


    if (n == idle_thread) {
        trace_sched_idle();
    } else if (p == idle_thread) {
        trace_sched_idle_ret();
    }
    n->stat_switches.incr();

    trace_sched_load(runqueue.size());

    // スレッドの状態をrunningにする    n->_detached_state->st.store(thread::status::running);
    // hysteresisとは最低限CPU実行権を握れる時間
    n->_runtime.hysteresis_run_start();

    assert(n!=p);

    // これは不明。切り替わり前のスレッドの状態を見ているが、何をしようとしているかはこれだけではわからない。
    if (p->_detached_state->st.load(std::memory_order_relaxed) == thread::status::queued
            && p != idle_thread) {
        n->_runtime.add_context_switch_penalty();
    }
    // 一度preemption_timerをキャンセルし、停止する
    preemption_timer.cancel();
    // yield経由でない場合
    if (!called_from_yield) {
    // 実行可能状態のスレッドが他にいる場合
        if (!runqueue.empty()) {
    // 次に実行可能状態になるスレッドをみる
            auto& t = *runqueue.begin();
    // 実行時間が次に実行可能状態になるスレッドと同じになるまで実行させる。(fairness)
            auto delta = n->_runtime.time_until(t._runtime.get_local());
            if (delta > 0) {
                preemption_timer.set(now + delta);
            }
        }
    // yield経由の場合は、preempt_afterで定義された時間経過したらexpireするように
    } else {
        preemption_timer.set(now + preempt_after);
    }

    // app_threadって何?
    if (app_thread.load(std::memory_order_relaxed) != n->_app) { // don't write into a cache line if it can be avoided
        app_thread.store(n->_app, std::memory_order_relaxed);
    }
    // これ、なんだろ?スイッチしたからTLB flushするのは何となくわかるが...
    if (lazy_flush_tlb.exchange(false, std::memory_order_seq_cst)) {
        mmu::flush_tlb_local();
    }
    // ようやっと、コンテキストスイッチ
    n->switch_to();
    if (p->_detached_state->_cpu->terminating_thread) {
        p->_detached_state->_cpu->terminating_thread->destroy();
        p->_detached_state->_cpu->terminating_thread = nullptr;
    }
}

スケジューリングアルゴリズムが分かったので、何となくやっていることは見えますね。
では、少し細かいところをみましょう。

細かいところを見る

incoming_wakeup

reschedule_from_interruptの最初の方に以下のコードがありました。

core/sched.cc
    // これは何だろう?気になる...
    handle_incoming_wakeups();

実はこの処理は、「実行可能状態のスレッドが追加でいる場合、自コアのrunqueueに該当スレッドを追加する」処理です。

「実行可能状態のスレッドが追加」される場合、そのスレッドはincoming_wakeupsキューに格納されます。
このキューにスレッドが格納されるのは、「スレッドがsleepingから起床する場合」もしくは「load_balance()により、スレッドが他のコアからマイグレーションされる場合」です。(詳細はincoming_wakeupsでgrepしてみるとわかります。ちなみにincoming_wakeupsはCPU依存のsmp_init()で初期化されます。)

core/sched.cc
void cpu::handle_incoming_wakeups()
{
    cpu_set queues_with_wakes{incoming_wakeups_mask.fetch_clear()};
    if (!queues_with_wakes) {
        return;
    }
    for (auto i : queues_with_wakes) {
        irq_save_lock_type irq_lock;
        WITH_LOCK(irq_lock) {
         // wakeupしたり、migrationしたりというイベントが発生した場合、そのスレッドはincoming_wakeupsキューにいる
            auto& q = incoming_wakeups[i];
          // incoming_wakeupsキューの中を漁る
            while (!q.empty()) {
           // 先頭にあるスレッドをひとつとる
                auto& t = q.front();
                q.pop_front();
           // 今動いているスレッドに対する例外処理
                if (&t == thread::current()) {
                    // Special case of current thread being woken before
                    // having a chance to be scheduled out.
           // schedule outされる前にwakeupするケース
                    t._detached_state->st.store(thread::status::running);
                } else {
            // 実行可能状態...キューイングする
                    t._detached_state->st.store(thread::status::queued);
                    // Make sure the CPU-local runtime measure is suitably
                    // normalized. We may need to convert a global value to the
                    // local value when waking up after a CPU migration, or to
                    // perform renormalizations which we missed while sleeping.
                    t._runtime.update_after_sleep();
                    enqueue(t);
            // XXX : ???
                    t.resume_timers();
                }
            }
        }
    }

    trace_sched_load(runqueue.size());
}

スケジューリングのためのメソッド呼び出しの確認

core/sched.cc
    p->_runtime.ran_for(interval);

先程のドキュメントのサマリでも登場したran_for。
ざっくりというと、今の実行時間から次の実行時間とそれを求める係数を計算するメソッドだ。
詳しくは論文を見るように....って、ざっくり読みしかしてないけどな(笑)

core/sched.cc
    n->cputime_estimator_set(now, n->_total_cpu_time);

これは、以下の通り、スレッドの通算CPU使用時間を計算しているようです。

core/sched.cc
// Estimating a *running* thread's total cpu usage (in thread::thread_clock())
// requires knowing a pair [running_since, cpu_time_at_running_since].

app_threadって何?

core/sched.cc
    if (app_thread.load(std::memory_order_relaxed) != n->_app) { // don't write into a cache line if it can be avoided
        app_thread.store(n->_app, std::memory_order_relaxed);
    }

grepしてみると、どうやらスレッドの属性の一つを示すbool値で、_appでgrepするといくつかのrubyのクラスも引っ掛けられる。
grepした結果に気になるところがあったので見てみよう。
なお、「coyp_if」については、このページを参考にされたい。

arch/aarch64/mmu.cc
void flush_tlb_all()
{
    static std::vector<sched::cpu*> ipis(sched::max_cpus);

    if (sched::cpus.size() <= 1) {
        mmu::flush_tlb_local();
        return;
    }

    SCOPE_LOCK(migration_lock);
    // たぶん、この処理を実行するCPUはここで即TLBをflushする
    mmu::flush_tlb_local();
    std::lock_guard<mutex> guard(tlb_flush_mutex);
    tlb_flush_waiter.reset(*sched::thread::current());
    int count;
    if (sched::thread::current()->is_app()) {
        ipis.clear();
        std::copy_if(sched::cpus.begin(), sched::cpus.end(), std::back_inserter(ipis),
                [](sched::cpu* c) {
         // 自身を動かしているCPUは除外
            if (c == sched::cpu::current()) {
                return false;
            }
         // 
            c->lazy_flush_tlb.store(true, std::memory_order_relaxed);
         // app_thread属性がついていないものは除外
            if (!c->app_thread.load(std::memory_order_seq_cst)) {
                return false;
            }
          // lazy_flush_tlb属性が元からfalseだった場合も除外
            if (!c->lazy_flush_tlb.exchange(false, std::memory_order_relaxed)) {
                return false;
            }
            return true;
        });
        count = ipis.size();
    } else {
        count = sched::cpus.size() - 1;
    }
    tlb_flush_pendingconfirms.store(count);
    if (count == (int)sched::cpus.size() - 1) {
        tlb_flush_ipi.send_allbutself();
    } else {
        for (auto&& c: ipis) {
            tlb_flush_ipi.send(c);
        }
    }
    sched::thread::wait_until([] {
            return tlb_flush_pendingconfirms.load() == 0;
    });
    tlb_flush_waiter.clear();
}

どうやら、app_thread属性のついているスレッドを動かしているCPUではスイッチングのときにTLB flushをする場合があるということだろう。(きっとアドレスの仮想 - 物理のエントリが変わる場合にapp_thread属性が有効にするのだろう)

逆にapp_threadがついていないCPUを動かす場合、アドレスの仮想 - 物理のエントリが変わらないのでTLB Flushする必要はないし、TLB Flushすることで却ってパフォーマンスにとって悪影響ということであろう。

実際に、reschedule_from_interrupt()の最後の方で以下のコードがある。

core/sched.cc
   if (lazy_flush_tlb.exchange(false, std::memory_order_seq_cst)) {
        mmu::flush_tlb_local();
    }

最後に

OSvのソースコードリーディング、なかなか楽しめました。
OSvのcoreの下で cat * | wc -l すると14000程度です。
一方でlinuxのkernelディレクトリの下で cat * | wc -l すると81000程度です。(サブディレクトリの下は含んでいません!)
なので、OSのソース読んでみたい人にとっても、全体像を把握しやすいのではないでしょうか。

では。

9
9
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
9
9

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?