『Go言語による並行処理』を読んでContext
チョットワカルようになったので記録に残します。
Context
の解決したい問題
- Goは並行処理のためにゴルーチンを簡単に生成できる
- 多くの場合ゴルーチンは他のゴルーチンと連携しており、子のゴルーチンが処理を続けるべきかを知ることでリソースを効率的に利用できる
- それを実現する標準的なパターンが必要となり
Context
パッケージができた
使う時にどうするのが良いのか
実際に確かめてみました。
サンプルコードはgithubにあげてます。
Proxy
別の場所からリクエストを受け取って次のサーバー流す場合。
Proxy自身も個別にタイムアウト設定を持っており渡した先で一定時間に終わらない場合にタイムアウトをさせます。
childCtx, cancel := context.WithCancel(ctx)
でcancel関数とそれとつながったContextを生成しそれを次のサーバーに渡し、Proxyがtimeoutだと判断したらcancel()
を呼びます。
childCtx, cancel := context.WithCancel(ctx)
defer cancel()
go func() {
// Call server's method with cancelable context by proxy
res, err := p.Server.Call(childCtx, task)
ch <- ProxiedResponce{
Responce: res,
Error: err,
}
}()
go func() {
// Call cancel after proxy timeout
<-time.After(p.Timeout)
cancel()
}()
サンプルの結果は以下のとおりです。
# Responce表示は[(計測時間)/(TaskのTimeout設定)]
# Clientは`70ms`でTimeoutするContextを渡す
# Proxyのtimeoutは45ms
# 30msで完了する`Fast`Serverにリクエストを渡した場合
Proxy: through[TO:45ms] Fast: Timeout [20.1622ms/20ms] Timeout by Server # time 20msなのでServerにてTimeout
Proxy: through[TO:45ms] Fast: Complete [30.9573ms/60ms] <nil>
Proxy: through[TO:45ms] Fast: Complete [30.9573ms/40ms] <nil>
Proxy: through[TO:45ms] Fast: Complete [30.9573ms/80ms] <nil>
# 90msで完了する`Slow`Serverにリクエストを渡した場合
Proxy: through[TO:45ms] Slow: Timeout [20.1622ms/20ms] Timeout by Server
Proxy: through[TO:45ms] Slow: Timeout [40.8834ms/40ms] Timeout by Server
Proxy: through[TO:45ms] Slow: Cancel [45.0796ms/80ms] context canceled # ProxyによるCancel
Proxy: through[TO:45ms] Slow: Cancel [45.0796ms/60ms] context canceled # ProxyによるCancel
# Proxyのtimeoutを75msに変更
# 30msで完了する`Fast`Serverにリクエストを渡した場合
Proxy: through[TO:75ms] Fast: Timeout [20.1622ms/20ms] Timeout by Server # time 20msなのでServerにてTimeout
Proxy: through[TO:75ms] Fast: Complete [30.9573ms/60ms] <nil>
Proxy: through[TO:75ms] Fast: Complete [30.9573ms/80ms] <nil>
Proxy: through[TO:75ms] Fast: Complete [30.9573ms/40ms] <nil>
# 90msで完了する`Slow`Serverにリクエストを渡した場合
Proxy: through[TO:75ms] Slow: Timeout [20.1622ms/20ms] Timeout by Server
Proxy: through[TO:75ms] Slow: Timeout [40.8041ms/40ms] Timeout by Server
Proxy: through[TO:75ms] Slow: Timeout [61.0672ms/60ms] Timeout by Server
Proxy: through[TO:75ms] Slow: Cancel [70.9447ms/80ms] context deadline exceeded # Clientの指定したTimeoutによるCancel
コード全体を示します。Clientに返すのにサーバーからの戻り値を受けていますがプロキシ自身がエラーメッセージを生成して戻したりWithTimeout
でより短いコンテキストを生成しなおすというパターンもありそうです。
キャンセル時のRollback完了まで確認が必要か、誰がキャンセルをしたのか明示が必要か、などによってそのあたり変わりそうですね。
// ProxyProcessWithTimeout pass the request to the next process
// and interrupts the request by its own time out setting
type ProxyProcessWithTimeout struct {
Name string
Server Process
Timeout time.Duration
}
func (p *ProxyProcessWithTimeout) Call(ctx context.Context, task Task) (Responce, error) {
type ProxiedResponce struct {
Responce Responce
Error error
}
// Generate goroutine
ch := make(chan ProxiedResponce)
// Get a cancel function to cancel by proxy
childCtx, cancel := context.WithCancel(ctx)
defer cancel()
go func() {
// Call server's method with cancelable context by proxy
res, err := p.Server.Call(childCtx, task)
ch <- ProxiedResponce{
Responce: res,
Error: err,
}
}()
go func() {
// Call cancel after proxy timeout
<-time.After(p.Timeout)
cancel()
}()
// wait responce
pres := <-ch
res := Responce(fmt.Sprintf("%s: through[TO:%s] %s", p.Name, p.Timeout, string(pres.Responce)))
return res, pres.Error
}
Process
重い処理を行う場合は受け取ったContext
から割り込みを適切に受けるのが良いです。
一定の短いループに分解できるのであれば途中で<-ctx.Done()
を確認して中断をすべきかを判断させます。
for {
// 何らかの処理...
// ...
//
select {
// Cancel, Time
case <-ctx.Done():
// rollback
return
default:
// 終了条件
}
}
サンプルの結果は以下のとおりです。
# Timeout [(計測時間)/(TaskのTimeout設定)]
# Clientは`70ms`でTimeoutするContextを渡す
# Proxyは`75ms`でContextをCancelする
Proxy: through[TO:75ms] l:1ms: Complete [loop: 20] [33.804ms/40ms] <nil>
Proxy: through[TO:75ms] l:1ms: Timeout [41.9723ms/40ms] Timeout by Simulation
Proxy: through[TO:75ms] l:1ms: Cancel [70.7212ms/80ms] context deadline exceeded # Contextのシグナルを受けて中断できている
Proxy: through[TO:75ms] l:1ms: Cancel [70.7212ms/80ms] context deadline exceeded
サンプルの全体は以下のとおりです。
Tick時間かかる処理をn回行う(task.Valueから取る)とし、ループのたびにcontextを確認します。
例では1tick=1ms
で動かしてますが20loopの実行に30msかかってますね。どこに時間がかかってるかまた調べます。
// ProcessSimulation simurate the real process.
// This is an example of a process that can be suspended from Context
type ProcessSimulation struct {
Name string
Tick time.Duration
}
func (p *ProcessSimulation) Call(ctx context.Context, task Task) (Responce, error) {
start := time.Now()
loopMax, ok := task.Value.(int)
if !ok {
return Responce(""), errors.Errorf("Task Value is not int")
}
if loopMax < 1 {
return Responce(""), errors.Errorf("Task.Value must int type and 1 or more")
}
// Heavy loop process
var counter int
timeout := time.After(task.Timeout)
for {
if counter >= loopMax {
return Responce(fmt.Sprintf("%s: Complete [loop: %d] [%s/%s]", p.Name, counter, time.Since(start), task.Timeout)), nil
}
counter++
// Here is check Context and time out setting.
select {
case <-ctx.Done():
// Add Rollback process when need
return Responce(fmt.Sprintf("%s: Cancel [%s/%s]", p.Name, time.Since(start), task.Timeout)), ctx.Err()
case <-timeout:
// Add Rollback process when need
return Responce(fmt.Sprintf("%s: Timeout [%s/%s]", p.Name, time.Since(start), task.Timeout)), errors.Errorf("Timeout by Simulation")
case <-time.After(p.Tick):
}
}
}
ソースコードを読む
次はなぜWith***
で設定をするのか、複数回WithTimeout()
を呼んだらどうなるのか知るためにソースコード(context.go)を読んでみました。
- Contextは
With***(ctx,args)
のたびに新しいインスタンスを生成している - それらはContext内で宣言された
*emptyCtx
を終端に持つ連結リストの子孫である - Cancelは常に親から子に伝播し親は影響を受けない
- Valueは子から親に向かってのみ参照する
という実装になっていました。これで安心してキャンセルできますね。
Cancelの実装
Contextのキャンセルの実装はcancelCtx
にあります。Timeout
,Deadline
もcancelCtx
を埋め込んであり指定の時間が来たらキャンセルをするだけです。
そのキャンセルの実装が次の部分です
- [func (c *cancelCtx) cancel(removeFromParent bool, err error)]
(https://github.com/golang/go/blob/master/src/context/context.go#L347-L372)
自身の状態を見てキャンセルされていない場合は状態を変更し(err fieldをnon-nilにする)子にも同じerror
を伝え、子を連結リストから取り除きます。
また自身を親の持つ子のリスト取り除きます。
親子関係の設定はWith***
の際にfunc propagateCancel(parent Context, child canceler)
を呼ぶことで行われます。
Valueの実装
WithValue(ctx, key, value)
は指定されたkey
とvalue
とctx
への参照を持つ*valueCtx
を返します。
Value(key)
が呼ばれるとkey
が一致するまで親の方向をたどります。
すべての親はbackground *emptyCtx
であり常にnil
を返すのでkey
が一致する最も近い*valueCtx
の値かnil
のどちらかが返ってきます。
Done()から得られる値
Doneの戻り値は<-chan struct{}
です。
クローズしてあるチャンネルはその型のデフォルト値とtrue
を返します。
故にどこでどれだけDone()で待っていてもチャンネル閉じられたなら処理は次に進みます。
まとめ
以上Context
のキャンセル処理について試しました。
今思うと上記サンプルのResponceにはerrorを含んだほうが良かった気がします。
こういうのを考えるのは楽しいですね。
Context
をhttp.Request
などから渡されるところから入ると単なるキャリアのように感じますが(私がそうでした)パフォーマンスを出すために裏側でゴルーチンが動いておりその背景からContext
を引き継いでいるのだということでいろいろ納得をしました。
パフォーマンスを出すうえでゴルーチンで並行処理をすると思うのでその際は以下に注意して作ると良さそうです。
-
Context
を引数にとって割り込み可能に作る - 重い処理はどこで割り込ませるのか、Rollback処理、エラーの伝播を設計時に考えておく