1. kawasin73

    No comment

    kawasin73
Changes in tags
Changes in body
Source | HTML | Preview
@@ -1,319 +1,322 @@
既存のGoで実装されたタスクスケジューラがスケーラブルでなかったので、スケーラブルなスケジューラライブラリを作りました。
ライブラリのURLは、https://github.com/kawasin73/htask です。
以前「[Go言語でスヌーズ機能付きリマインダーLINE botを実装した](https://qiita.com/kawasin73/items/c8d0fc7ede6036e5ce39)」で作ったうさちゃんの [ここ](https://github.com/kawasin73/usa-reminder/blob/master/scheduler.go) で利用しています。
## 既存のスケジューラライブラリ
github でスターが付いているスケジューラのライブラリは以下のようなものを見つけました。
- [`github.com/carlescere/scheduler`](https://github.com/carlescere/scheduler)
- [`github.com/jasonlvhit/gocron`](https://github.com/jasonlvhit/gocron)
- [`github.com/rakanalh/scheduler`](https://github.com/rakanalh/scheduler)
これらはいずれも便利なインターフェイスを提供していますが、多くのタスクを登録された時にパフォーマンス上の問題を抱えています。
### `github.com/carlescere/scheduler`
URL: https://github.com/carlescere/scheduler
このライブラリは、以下のような使いやすいインターフェイスを備え、タスクのキャンセルや即時実行ができる機能を持っているのが特徴です。
```go
// Run every 2 seconds but not now.
scheduler.Every(2).Seconds().NotImmediately().Run(job)
// Run now and every X.
scheduler.Every(5).Minutes().Run(job)
scheduler.Every().Day().Run(job)
scheduler.Every().Monday().At("08:30").Run(job)
```
このライブラリでは、[この部分の実装](https://github.com/carlescere/scheduler/blob/master/scheduler.go#L160-L188) で1つのタスクごとに1つの goroutine を立ち上げて時間とキャンセルを監視しています。そして、タスクの実行ごとにさらに goroutine を立ち上げて実行しています。
この実装では、タスクの数だけ goroutine が同時に生成されるため **メモリ効率が悪い** です。goroutine は軽量なグリーンスレッドとはいえ、1つあたり `2KB` 以上のメモリを消費します。もし、100万タスクを追加すると `2GB` のメモリが必要になり、さらにタスクの実行のためにも goroutine のためのメモリが必要となります。
### `github.com/jasonlvhit/gocron`
URL: https://github.com/jasonlvhit/gocron
このライブラリも以下のような使いやすいインターフェイスを提供していることが特徴です。
```go
// Do jobs with params
gocron.Every(1).Second().Do(taskWithParams, 1, "hello")
// Do jobs without params
gocron.Every(1).Second().Do(task)
gocron.Every(2).Seconds().Do(task)
gocron.Every(1).Minute().Do(task)
gocron.Every(2).Minutes().Do(task)
// function At() take a string like 'hour:min'
gocron.Every(1).Day().At("10:30").Do(task)
gocron.Every(1).Monday().At("18:30").Do(task)
```
このライブラリでは、[この部分の実装](https://github.com/jasonlvhit/gocron/blob/master/gocron.go#L479-L497) で、タスクを監視する1つの goroutine を立ち上げ、1秒おきに監視しています。そして、[この部分の実装](https://github.com/jasonlvhit/gocron/blob/master/gocron.go#L395-L410) で登録された全てのタスクを線形探索して、実行可能なタスクを抽出し実行しています。
この実装では、1秒おきに実行可能なタスクを検査しているので、**1秒未満のスパンのタスクに対応できません**。また、毎秒全てのタスクを線形探索しているので、多くのタスクを追加されると **パフォーマンスが `O(N)` で悪く** なります。
### `github.com/rakanalh/scheduler`
URL: https://github.com/rakanalh/scheduler
このライブラリは、登録されたタスクを `sqlite` などのデータベースに永続化することができる特徴があります。
このライブラリでは `github.com/jasonlvhit/gocron` と同じように、[この部分の実装](https://github.com/rakanalh/scheduler/blob/master/scheduler.go#L78-L106) でタスクを監視する1つの goroutine を立ち上げ1秒おきに監視し、[この部分の実装](https://github.com/rakanalh/scheduler/blob/master/scheduler.go#L195-L206) で1秒おきに全てのタスクを線形探索し、実行可能なタスクを新しい goroutine を立ち上げて実行しています。
この実装では、`github.com/rakanalh/scheduler` と同じ、**1秒未満のスパンのタスクに対応できない**課題と、**パフォーマンスが `O(N)` で悪くなる** 課題を持っています。あと、読んでいるとスレッドセーフでない実装がちらほらありました。
## 課題を解決するアーキテクチャと機能
これらのライブラリは、デベロッパーに対するインターフェイスに重きを置いていてパフォーマンスの重要度は低いのかもしれませんが、今回は、パフォーマンスにも配慮した実装をしました。
![htask.png](https://qiita-image-store.s3.amazonaws.com/0/65193/41ed0ad8-3f35-b389-963f-53e4ec7985e0.png)
### タスク
タスクはキャンセル可能です。
### スケジューラ
1つの goroutine をスケジューラとして立ち上げ、`タスクの優先度付きキューへ追加` と、一番近い時間に実行する `1つのタスクの監視` をシングルスレッドで行います。
タスクの優先度付きキューは、[最小ヒープ](https://ja.wikipedia.org/wiki/%E4%BA%8C%E5%88%86%E3%83%92%E3%83%BC%E3%83%97) というデータ構造で実現され、実行する時刻のタイムスタンプが一番小さいものを `O(1)` で取得でき、タスクの追加と削除を `O(logN)` で行えます。
タスクの追加と監視、後述のワーカーへの通知は全て `チャネル` を使って行い、`select` を使って同時に待ちます。
実装は [この部分](https://github.com/kawasin73/htask/blob/master/scheduler.go#L133-L164) です。
### ワーカー
スケジューラの作成と同時に複数の `ワーカーgoroutine` を立ち上げ待機させます。実行時刻になったタスクがチャネル経由で ワーカーgoroutine に配給され実行されます。
`ワーカーgoroutine` を事前に立ち上げることで、同時実行されるタスクの量を制限することができます。ワーカーの処理が詰まると時間になってもタスクが実行されない可能性があるというデメリットがありますが、goroutine の数が限定されるため、**goroutine で使うメモリ量の上限を制限できる** というメリットがあります。
もし、同時実行されるタスクの数が多くなることが予想されるのであれば、ワーカーの数を多めに設定する必要があります。
また、ワーカー数を `0` に設定した時に、ワーカーは利用せずに **タスクの実行ごとに goroutine を立ち上げ実行することもできるオプション** を用意しました。これにより、使用メモリ量の上限の管理はタスクを追加する側の責任になってしまいますが、**I/Oにより多くの時間を使うようなタスクが多い場合**、goroutine により簡単に多重化できる特性を存分に使えるようになります。
## 使い方と便利なインターフェイス
このスケジューラライブラリのインターフェースは以下の通りです。
- `func NewScheduler(wg *sync.WaitGroup, workers int) *Scheduler`
- スケジューラの作成を行います。作成した時点で自動的に `スケジューラ goroutine` と `ワーカー goroutine` が起動されます。立ち上げられた goroutine の数だけ、`sync.WaitGroup` にカウントされます。
- `func (s *Scheduler) Set(chCancel <-chan struct{}, t time.Time, task func(time.Time)) error`
- キャンセルを通知するチャネルと、実行する時間、実行する関数(タスク)を渡します。
- キャンセルを通知するチャネルは `nil` でも可能です。その場合はキャンセルはできなくなります。`context` パッケージの `context.Context.Done()` を使えることを意識しています。
- `func (s *Scheduler) ChangeWorkers(workers int) error`
- ワーカー数はスケジューラのダウンタイムなく動的に変更可能です。
- `func (s *Scheduler) Close() error`
- `スケジューラ goroutine` と `ワーカー goroutine` を終了させます。未実行タスクは全て破棄されます。`NewScheduler` で渡した `sync.WaitGroup` を待つことで全ての goroutine の終了を知ることができます。
### `github.com/kawasin73/htask/cron`
このスケジューラライブラリは洗練されたインターフェイスにしていますが、普通のデベロッパからは使いにくいかもしれないため、スケジューラをいい感じにラップする `cron.Cron` というコンポーネントを作り、ヒューマンフレンドリーなインターフェイスを提供しました。
簡単な使い方は以下の通りです。
```go
// executed every 10:11 AM.
c.Every(1).Day().At(10, 11).Run(task)
// task will be executed in every 1 minute from now.
c.Every(1).Minute().Run(task)
tenSecondsLater := time.Now().Add(10 * time.Second)
// executed in every 2 seconds started from 10 seconds later.
cancel, err := c.Every(2).Second().From(tenSecondsLater).Run(task)
if err != nil {
// handle error
}
// cron can schedule one time task.
c.Once(tenSecondsLater.Add(time.Minute)).Run(func() {
// task can be cancelled.
cancel()
})
```
## 実装
スケジューラの部分の実装を記載します。全ての実装は、[ソースコード](https://github.com/kawasin73/htask/blob/master/scheduler.go) を参照してください。
```go:scheduler.go
type job struct {
chCancel <-chan struct{}
t time.Time
task func(time.Time)
}
// Scheduler is used to schedule tasks.
type Scheduler struct {
chClose chan struct{}
wg *sync.WaitGroup
chJob chan job
chWork chan job
chFin chan struct{}
chWorkers chan int
wNum int
}
// Set enqueue new task to scheduler heap queue.
// task will be cancelled by closing chCancel. chCancel == nil is acceptable.
func (c *Scheduler) Set(chCancel <-chan struct{}, t time.Time, task func(time.Time)) error {
if t.IsZero() {
return ErrInvalidTime
} else if task == nil {
return ErrInvalidTask
}
select {
case <-c.chClose:
return ErrClosed
case <-chCancel:
return ErrTaskCancelled
case c.chJob <- job{chCancel: chCancel, t: t, task: task}:
return nil
}
}
type scheduleState struct {
heap *minHeap
job job
chWork chan<- job
timer *time.Timer
expired bool // timer is expired or not
}
func newScheduleState(heapSize int) *scheduleState {
timer := time.NewTimer(time.Second)
if !timer.Stop() {
<-timer.C
}
return &scheduleState{
heap: newMinHeap(heapSize),
timer: timer,
expired: true,
}
}
func (s *scheduleState) add(newJob job) error {
if err := s.heap.add(newJob); err != nil {
return err
}
if !s.expired && !s.timer.Stop() {
<-s.timer.C
}
s.job = s.heap.peek()
s.chWork = nil
// s.job must not be empty
s.timer.Reset(s.job.t.Sub(time.Now()))
s.expired = false
return nil
}
func (s *scheduleState) next() {
if !s.expired && !s.timer.Stop() {
<-s.timer.C
}
_ = s.heap.pop()
s.job = s.heap.peek()
s.chWork = nil
if s.job.t.IsZero() {
s.expired = true
} else {
s.timer.Reset(s.job.t.Sub(time.Now()))
s.expired = false
}
}
func (s *scheduleState) time(t time.Time, chWork chan<- job) {
s.expired = true
s.chWork = chWork
s.job.t = t
}
func (c *Scheduler) scheduler(wg *sync.WaitGroup, workers int) {
defer wg.Done()
// no limited min heap
// TODO: use limited heap
state := newScheduleState(0)
for {
select {
case <-c.chClose:
return
case workers = <-c.chWorkers:
if workers == 0 && state.chWork != nil {
go state.job.task(state.job.t)
state.next()
}
case newJob := <-c.chJob:
if err := state.add(newJob); err != nil {
// TODO: heap is unlimited then no error will occur
panic(err)
}
case <-state.job.chCancel:
state.next()
case t := <-state.timer.C:
state.time(t, c.chWork)
if workers == 0 {
go state.job.task(state.job.t)
state.next()
}
case state.chWork <- state.job:
state.next()
}
}
}
func (c *Scheduler) worker(wg *sync.WaitGroup) {
defer wg.Done()
for {
select {
case <-c.chClose:
return
case <-c.chFin:
return
case j := <-c.chWork:
j.task(j.t)
}
}
}
```
### `func (c *Scheduler) scheduler(wg *sync.WaitGroup, workers int)`
スケジューラのキモはここなので、ここの解説をします。
`case <-c.chClose` と `case workers = <-c.chWorkers` は、終了検知とワーカ数の変更検知をしています。
`case newJob := <-c.chJob` では、`Set()`によって追加された新しいタスクをヒープに保存し、最小タイムスタンプのタスク(以下、`最小タスク`と呼びます)を更新しています。
スケジューラでは、`state.job` に最小タイムスタンプのタスクを保持し、その状態を見ながら操作を行います。
-最小タスクは `state.job` に保存され、`ジョブがない`、`待ち`、`キャンセルされた`、`時間がきた`、`ワーカーに渡された`の5状態をもち、スケジューラはステートマシンのように振る舞います。
+最小タスクは `state.job` に保存され、`Empty : ジョブがない`、`Pending : 待ち`、`Cancelled : キャンセルされた`、`Expired : 時間がきた`、`Sent to Worker : ワーカーに渡された`の5状態をもち、スケジューラはステートマシンのように振る舞います。
+
+![htask-state.png](https://qiita-image-store.s3.amazonaws.com/0/65193/d45999a6-81cb-c9b3-8f4e-014d7ce6fb81.png)
+
+
`case newJob := <-c.chJob` によって、最小タスクは `待ち` 状態になります。
-`キャンセルされた` 最小タスクは、`case <-state.job.chCancel` によって次の最小タスクが `state.job` に `待ち` 状態となって設定されます。
+キャンセルされた最小タスクは、`case <-state.job.chCancel` によって `キャンセルされた` 状態に移り、次の最小タスクが `state.job` に `待ち` 状態となって設定されます。
-`時間がきた` 最小タスクは、`case t := <-state.timer.C` によって、`ワーカーに渡された` 状態に移ります。この状態は、`state.chWork` が `nil` からチャネルを設定されることによって表させれます。もし、つど goroutine を立ち上げるオプションが有効(`workers == 0`)な場合は、その場でワーカーを起動し、次の最小タスクを `待ち` 状態で設定します。
+`待ち` 最小タスクは、`case t := <-state.timer.C` によって、`時間がきた` 状態に移ります。この状態は、`state.chWork` が `nil` からチャネルを設定されることによって表させれます。もし、つど goroutine を立ち上げるオプションが有効(`workers == 0`)な場合は、その場でワーカーを起動し、次の最小タスクを `待ち` 状態で設定します。
-`ワーカーに渡された` 最小タスクは、`case state.chWork <- state.job` によって、次の最小タスクが `state.job` に `待ち` 状態となって設定されます。
+`時間がきた` 最小タスクは、`case state.chWork <- state.job` によって `ワーカーに渡された`に移り、次の最小タスクが `state.job` に `待ち` 状態となって設定されます。
`ジョブがない` 状態の場合は、`state.timer.C` はリセットされた状態で、`state.job.chCancel` は `nil` になるため、タスクの追加がされるまでこのステートマシンは停止します。
## この実装の課題と展望
この実装では、以下の課題があります。
キャンセルされたタスクを即座に最小ヒープから削除できません。キャンセルの検知はそのタスクが最小タスクになるまで行われないため、ヒープの容量を無駄に利用することになります。解決策としては、定期的にヒープ内に保管されているタスクを全て終了チェックすることですが、めんどくさかったので今回は実装していません。
タスク数の上限を設定できません。最小ヒープに保存されるタスクの数を制限すれば実現できるのですが、タスク数の上限に達した時に、`Set()` インターフェイスにタスク数の上限を伝える仕組みのいい実装が思い浮かばなかったので保留しています。`Set()` で都度タイマーチャネルを生成してタイムアウトさせるか、`sync.Mutex` で保護された状態を`Set()` 内でチェックするか(根本的な解決になっていない)という方法が考えつきましたが、もっといい方法がありそうなので保留してます。ちなみに、`Set()`できるまでずっと待たせるとワーカーの中から追加した時にデッドロックを起こすので、失敗させる必要があります。
課題ではありませんが、スケジューラのマルチスレッド化も可能です。スケジューラは複数同時並列に動いても問題ない実装なので複数の `スケジューラ goroutine` を立ち上げることもできますが、めんどくさかったし必要もなさそうなので実装していません。
## 最後に
うさちゃん使ってるの数人なので、スケーラブルなスケジューラは必要ないのですが、思いついてしまったので作りました。楽しかったです。