LoginSignup
55
29

More than 5 years have passed since last update.

Goでスケーラブルなスケジューラを書いた

Last updated at Posted at 2018-07-02

既存のGoで実装されたタスクスケジューラがスケーラブルでなかったので、スケーラブルなスケジューラライブラリを作りました。

ライブラリのURLは、https://github.com/kawasin73/htask です。

以前「Go言語でスヌーズ機能付きリマインダーLINE botを実装した」で作ったうさちゃんの ここ で利用しています。

既存のスケジューラライブラリ

github でスターが付いているスケジューラのライブラリは以下のようなものを見つけました。

これらはいずれも便利なインターフェイスを提供していますが、多くのタスクを登録された時にパフォーマンス上の問題を抱えています。

github.com/carlescere/scheduler

URL: https://github.com/carlescere/scheduler

このライブラリは、以下のような使いやすいインターフェイスを備え、タスクのキャンセルや即時実行ができる機能を持っているのが特徴です。

    // Run every 2 seconds but not now.
    scheduler.Every(2).Seconds().NotImmediately().Run(job)

    // Run now and every X.
    scheduler.Every(5).Minutes().Run(job)
    scheduler.Every().Day().Run(job)
    scheduler.Every().Monday().At("08:30").Run(job)

このライブラリでは、この部分の実装 で1つのタスクごとに1つの goroutine を立ち上げて時間とキャンセルを監視しています。そして、タスクの実行ごとにさらに goroutine を立ち上げて実行しています。

この実装では、タスクの数だけ goroutine が同時に生成されるため メモリ効率が悪い です。goroutine は軽量なグリーンスレッドとはいえ、1つあたり 2KB 以上のメモリを消費します。もし、100万タスクを追加すると 2GB のメモリが必要になり、さらにタスクの実行のためにも goroutine のためのメモリが必要となります。

github.com/jasonlvhit/gocron

URL: https://github.com/jasonlvhit/gocron

このライブラリも以下のような使いやすいインターフェイスを提供していることが特徴です。

    // Do jobs with params
    gocron.Every(1).Second().Do(taskWithParams, 1, "hello")

    // Do jobs without params
    gocron.Every(1).Second().Do(task)
    gocron.Every(2).Seconds().Do(task)
    gocron.Every(1).Minute().Do(task)
    gocron.Every(2).Minutes().Do(task)

    // function At() take a string like 'hour:min'
    gocron.Every(1).Day().At("10:30").Do(task)
    gocron.Every(1).Monday().At("18:30").Do(task)

このライブラリでは、この部分の実装 で、タスクを監視する1つの goroutine を立ち上げ、1秒おきに監視しています。そして、この部分の実装 で登録された全てのタスクを線形探索して、実行可能なタスクを抽出し実行しています。

この実装では、1秒おきに実行可能なタスクを検査しているので、1秒未満のスパンのタスクに対応できません。また、毎秒全てのタスクを線形探索しているので、多くのタスクを追加されると パフォーマンスが O(N) で悪く なります。

github.com/rakanalh/scheduler

URL: https://github.com/rakanalh/scheduler

このライブラリは、登録されたタスクを sqlite などのデータベースに永続化することができる特徴があります。

このライブラリでは github.com/jasonlvhit/gocron と同じように、この部分の実装 でタスクを監視する1つの goroutine を立ち上げ1秒おきに監視し、この部分の実装 で1秒おきに全てのタスクを線形探索し、実行可能なタスクを新しい goroutine を立ち上げて実行しています。

この実装では、github.com/rakanalh/scheduler と同じ、1秒未満のスパンのタスクに対応できない課題と、パフォーマンスが O(N) で悪くなる 課題を持っています。あと、読んでいるとスレッドセーフでない実装がちらほらありました。

課題を解決するアーキテクチャと機能

これらのライブラリは、デベロッパーに対するインターフェイスに重きを置いていてパフォーマンスの重要度は低いのかもしれませんが、今回は、パフォーマンスにも配慮した実装をしました。

htask.png

タスク

タスクはキャンセル可能です。

スケジューラ

1つの goroutine をスケジューラとして立ち上げ、タスクの優先度付きキューへ追加 と、一番近い時間に実行する 1つのタスクの監視 をシングルスレッドで行います。
タスクの優先度付きキューは、最小ヒープ というデータ構造で実現され、実行する時刻のタイムスタンプが一番小さいものを O(1) で取得でき、タスクの追加と削除を O(logN) で行えます。

タスクの追加と監視、後述のワーカーへの通知は全て チャネル を使って行い、select を使って同時に待ちます。

実装は この部分 です。

ワーカー

スケジューラの作成と同時に複数の ワーカーgoroutine を立ち上げ待機させます。実行時刻になったタスクがチャネル経由で ワーカーgoroutine に配給され実行されます。

ワーカーgoroutine を事前に立ち上げることで、同時実行されるタスクの量を制限することができます。ワーカーの処理が詰まると時間になってもタスクが実行されない可能性があるというデメリットがありますが、goroutine の数が限定されるため、goroutine で使うメモリ量の上限を制限できる というメリットがあります。

もし、同時実行されるタスクの数が多くなることが予想されるのであれば、ワーカーの数を多めに設定する必要があります。

また、ワーカー数を 0 に設定した時に、ワーカーは利用せずに タスクの実行ごとに goroutine を立ち上げ実行することもできるオプション を用意しました。これにより、使用メモリ量の上限の管理はタスクを追加する側の責任になってしまいますが、I/Oにより多くの時間を使うようなタスクが多い場合、goroutine により簡単に多重化できる特性を存分に使えるようになります。

使い方と便利なインターフェイス

このスケジューラライブラリのインターフェースは以下の通りです。

  • func NewScheduler(wg *sync.WaitGroup, workers int) *Scheduler
    • スケジューラの作成を行います。作成した時点で自動的に スケジューラ goroutineワーカー goroutine が起動されます。立ち上げられた goroutine の数だけ、sync.WaitGroup にカウントされます。
  • func (s *Scheduler) Set(chCancel <-chan struct{}, t time.Time, task func(time.Time)) error
    • キャンセルを通知するチャネルと、実行する時間、実行する関数(タスク)を渡します。
    • キャンセルを通知するチャネルは nil でも可能です。その場合はキャンセルはできなくなります。context パッケージの context.Context.Done() を使えることを意識しています。
  • func (s *Scheduler) ChangeWorkers(workers int) error
    • ワーカー数はスケジューラのダウンタイムなく動的に変更可能です。
  • func (s *Scheduler) Close() error
    • スケジューラ goroutineワーカー goroutine を終了させます。未実行タスクは全て破棄されます。NewScheduler で渡した sync.WaitGroup を待つことで全ての goroutine の終了を知ることができます。

github.com/kawasin73/htask/cron

このスケジューラライブラリは洗練されたインターフェイスにしていますが、普通のデベロッパからは使いにくいかもしれないため、スケジューラをいい感じにラップする cron.Cron というコンポーネントを作り、ヒューマンフレンドリーなインターフェイスを提供しました。

簡単な使い方は以下の通りです。

    // executed every 10:11 AM.
    c.Every(1).Day().At(10, 11).Run(task)

    // task will be executed in every 1 minute from now.
    c.Every(1).Minute().Run(task)

    tenSecondsLater := time.Now().Add(10 * time.Second)
    // executed in every 2 seconds started from 10 seconds later.
    cancel, err := c.Every(2).Second().From(tenSecondsLater).Run(task)
    if err != nil {
        // handle error
    }

    // cron can schedule one time task.
    c.Once(tenSecondsLater.Add(time.Minute)).Run(func() {
        // task can be cancelled.
        cancel()
    })

実装

スケジューラの部分の実装を記載します。全ての実装は、ソースコード を参照してください。

scheduler.go
type job struct {
    chCancel <-chan struct{}
    t        time.Time
    task     func(time.Time)
}

// Scheduler is used to schedule tasks.
type Scheduler struct {
    chClose   chan struct{}
    wg        *sync.WaitGroup
    chJob     chan job
    chWork    chan job
    chFin     chan struct{}
    chWorkers chan int
    wNum      int
}

// Set enqueue new task to scheduler heap queue.
// task will be cancelled by closing chCancel. chCancel == nil is acceptable.
func (c *Scheduler) Set(chCancel <-chan struct{}, t time.Time, task func(time.Time)) error {
    if t.IsZero() {
        return ErrInvalidTime
    } else if task == nil {
        return ErrInvalidTask
    }
    select {
    case <-c.chClose:
        return ErrClosed
    case <-chCancel:
        return ErrTaskCancelled
    case c.chJob <- job{chCancel: chCancel, t: t, task: task}:
        return nil
    }
}

type scheduleState struct {
    heap    *minHeap
    job     job
    chWork  chan<- job
    timer   *time.Timer
    expired bool // timer is expired or not
}

func newScheduleState(heapSize int) *scheduleState {
    timer := time.NewTimer(time.Second)
    if !timer.Stop() {
        <-timer.C
    }
    return &scheduleState{
        heap:    newMinHeap(heapSize),
        timer:   timer,
        expired: true,
    }
}

func (s *scheduleState) add(newJob job) error {
    if err := s.heap.add(newJob); err != nil {
        return err
    }
    if !s.expired && !s.timer.Stop() {
        <-s.timer.C
    }
    s.job = s.heap.peek()
    s.chWork = nil
    // s.job must not be empty
    s.timer.Reset(s.job.t.Sub(time.Now()))
    s.expired = false
    return nil
}

func (s *scheduleState) next() {
    if !s.expired && !s.timer.Stop() {
        <-s.timer.C
    }
    _ = s.heap.pop()
    s.job = s.heap.peek()
    s.chWork = nil
    if s.job.t.IsZero() {
        s.expired = true
    } else {
        s.timer.Reset(s.job.t.Sub(time.Now()))
        s.expired = false
    }
}

func (s *scheduleState) time(t time.Time, chWork chan<- job) {
    s.expired = true
    s.chWork = chWork
    s.job.t = t
}

func (c *Scheduler) scheduler(wg *sync.WaitGroup, workers int) {
    defer wg.Done()
    // no limited min heap
    // TODO: use limited heap
    state := newScheduleState(0)
    for {
        select {
        case <-c.chClose:
            return
        case workers = <-c.chWorkers:
            if workers == 0 && state.chWork != nil {
                go state.job.task(state.job.t)
                state.next()
            }
        case newJob := <-c.chJob:
            if err := state.add(newJob); err != nil {
                // TODO: heap is unlimited then no error will occur
                panic(err)
            }
        case <-state.job.chCancel:
            state.next()
        case t := <-state.timer.C:
            state.time(t, c.chWork)
            if workers == 0 {
                go state.job.task(state.job.t)
                state.next()
            }
        case state.chWork <- state.job:
            state.next()
        }
    }
}

func (c *Scheduler) worker(wg *sync.WaitGroup) {
    defer wg.Done()
    for {
        select {
        case <-c.chClose:
            return
        case <-c.chFin:
            return
        case j := <-c.chWork:
            j.task(j.t)
        }
    }
}

func (c *Scheduler) scheduler(wg *sync.WaitGroup, workers int)

スケジューラのキモはここなので、ここの解説をします。

case <-c.chClosecase workers = <-c.chWorkers は、終了検知とワーカ数の変更検知をしています。

case newJob := <-c.chJob では、Set()によって追加された新しいタスクをヒープに保存し、最小タイムスタンプのタスク(以下、最小タスクと呼びます)を更新しています。

スケジューラでは、state.job に最小タイムスタンプのタスクを保持し、その状態を見ながら操作を行います。

最小タスクは state.job に保存され、Empty : ジョブがないPending : 待ちCancelled : キャンセルされたExpired : 時間がきたSent to Worker : ワーカーに渡されたの5状態をもち、スケジューラはステートマシンのように振る舞います。

htask-state.png

case newJob := <-c.chJob によって、最小タスクは 待ち 状態になります。

キャンセルされた最小タスクは、case <-state.job.chCancel によって キャンセルされた 状態に移り、次の最小タスクが state.job待ち 状態となって設定されます。

待ち 最小タスクは、case t := <-state.timer.C によって、時間がきた 状態に移ります。この状態は、state.chWorknil からチャネルを設定されることによって表させれます。もし、つど goroutine を立ち上げるオプションが有効(workers == 0)な場合は、その場でワーカーを起動し、次の最小タスクを 待ち 状態で設定します。

時間がきた 最小タスクは、case state.chWork <- state.job によって ワーカーに渡されたに移り、次の最小タスクが state.job待ち 状態となって設定されます。

ジョブがない 状態の場合は、state.timer.C はリセットされた状態で、state.job.chCancelnil になるため、タスクの追加がされるまでこのステートマシンは停止します。

この実装の課題と展望

この実装では、以下の課題があります。

キャンセルされたタスクを即座に最小ヒープから削除できません。キャンセルの検知はそのタスクが最小タスクになるまで行われないため、ヒープの容量を無駄に利用することになります。解決策としては、定期的にヒープ内に保管されているタスクを全て終了チェックすることですが、めんどくさかったので今回は実装していません。

タスク数の上限を設定できません。最小ヒープに保存されるタスクの数を制限すれば実現できるのですが、タスク数の上限に達した時に、Set() インターフェイスにタスク数の上限を伝える仕組みのいい実装が思い浮かばなかったので保留しています。Set() で都度タイマーチャネルを生成してタイムアウトさせるか、sync.Mutex で保護された状態をSet() 内でチェックするか(根本的な解決になっていない)という方法が考えつきましたが、もっといい方法がありそうなので保留してます。ちなみに、Set()できるまでずっと待たせるとワーカーの中から追加した時にデッドロックを起こすので、失敗させる必要があります。

課題ではありませんが、スケジューラのマルチスレッド化も可能です。スケジューラは複数同時並列に動いても問題ない実装なので複数の スケジューラ goroutine を立ち上げることもできますが、めんどくさかったし必要もなさそうなので実装していません。

最後に

うさちゃん使ってるの数人なので、スケーラブルなスケジューラは必要ないのですが、思いついてしまったので作りました。楽しかったです。

55
29
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
55
29