個人的メモです
経緯
rubyの非同期タスクスケジューラーの#timeout_afterメソッドを実行したい。
https://docs.ruby-lang.org/en/master/Fiber/SchedulerInterface.html#method-i-timeout_after
このメソッドでは、「指定時間経過後に特定の処理を実行する」ことを非同期に行う。
(要は、指定時間が経つまでの間ブロックしないようにしたい)
この実装にあたり、以下のgemで実装されているschedlerの実装を参考にする(カンペする)ことにした。
このscheduler#timeout_after の実装は以下。
https://github.com/socketry/async/blob/main/lib/async/scheduler.rb#L283-L298
# Invoke the block, but after the specified timeout, raise {TimeoutError} in any currenly blocking operation. If the block runs to completion before the timeout occurs or there are no non-blocking operations after the timeout expires, the code will complete without any exception.
# @parameter duration [Numeric] The time in seconds, in which the task should complete.
def timeout_after(timeout, exception = TimeoutError, message = "execution expired", &block)
fiber = Fiber.current
timer = @timers.after(timeout) do
if fiber.alive?
fiber.raise(exception, message)
end
end
yield timer
ensure
timer.cancel if timer
end
end
このgemでは、タイマーを使用するイベントループの実行をtimers gem(作者同じ)に切り出している。
👉 timers gem を読むことにした。
実際に使ってみた
Timer::Group.newで、Timers::Groupを初期化する。
初期化されたtimersは、@events(@queue, @sequenceに入ったHandles)、@interval、@timers と @paused_timers をフィールドとして持つ。
Timers::Group#afterメソッドで、ブロックの実行を予約する。
引数として指定された時間より後に、初めてTimers::Group#fireメソッドを呼び出した時にそのブロックが実行されるようだ。
waitをすると次の一つ目のeventの実行が終了するまでブロックする。(実行するべきイベントがないときは即座にnilを返すようだ)
内部的には、Kernal#sleepを実行している。(👉task schedulerでは、このsleepを非同期に行なっているはず)
いまいち@timersが何なのか把握できていない
timers.afterなどで、新しくhandleを追加したときの返り値はTimer::Timerオブジェクト。
このtimerの#pause, #cancelなどを呼び出すことで、追加されたイベント(handle)の挙動の調整を行うことができる。
多分、ユーザー側にHandleは露出していなくて、基本的にtimerを介して登録されたイベントの調整を行う。
(TODO: ⏫なぜpauseしたのにfireで実行されるのだろう)
登場する主要モデル
-
Events
- https://github.com/socketry/timers/blob/master/lib/timers/events.rb#L28
- queueとpriority heapの二つのデータ構造の中に handleを入れている。
- 効率化のために分けている?
- #scheduleの際には、とりあえず@queue(PriorityHeap
)の中に入れて、#fireする際には、merge!によって、@queueのなかのhandleは全て@sequences(List)に移しているっぽい。
-
Events::Handle
- https://github.com/socketry/timers/blob/master/lib/timers/events.rb#L30
- Represents a cancellable handle for a specific timer event.
- event = handleか。
-
Timer
- https://github.com/socketry/timers/blob/master/lib/timers/timer.rb#L29
- #pause, #resume, #delay, #reset メソッドがある。
- @handle と結び付けられている。(一方でHandle -> timer の結び付けはされてなさそう)
-
Group
- https://github.com/socketry/timers/blob/master/lib/timers/group.rb#L32
- has many timers and many events and one interval
その他メモ
上述の通り、マルチスレッドでなんとかしているとかいう話ではない。イベントの発生が起こらないかをブロッキングしてチェックしている。