はじめに
こんにちは!@uh-zzです!
この記事は、Go Advent Calendar 2022の 10 日目の記事になります!
今年は、個人的に色々なことに挑戦した年だったなあと振り返るとともに、去年のアドベントカレンダーからもう1年経つのか〜という気持ちです
(去年もパターンについて書いてました笑)
この記事では、Go における Future パターンの紹介と使わている OSS を見ていきたいと思います
いったんCM
社のアドベントカレンダーも絶賛開催中です!
想定読者
- Go でプログラミングを始めた人
- ゴリゴリ Go を書いている人
- 他言語で並行プログラミングしてる人
Future パターンとは
あるメソッドを呼び出すとします。 もしもオブジェクトが、そのメソッドを実行できる状態なら、実行します。 でも、実行できない状態なら、将来実行できる状態になるまで待つようにしたいとします。 その時に使えるのがこの Future パターンです。 future は「未来」という意味です
もう少し正確にお話しましょう。 単にあるクラスに 「実行できる状態になるまで待つ」 という機能を入れるわけではありません。 すでに存在しているクラスに一皮かぶせて、 「実行できる状態になるまで待てるような機能を追加する」 というのが Future パターンです。
上記の参考記事内では、Java をつかったマルチスレッドプログラミングで Future パターンが実装されています。
引用箇所の説明がほぼすべてですが、イメージ図で補足するとこんな感じになります
呼び出し元と処理するメソッドの間に Future メソッドを挟むことで、Future メソッドがプロキシ的に働き、非同期的に処理するメソッドを実行できるようになっています。
Go だとこんなかんじにかけるらしい
以下の記事で Future/Promise
という説明がされています
※記事内にあるコードに、コメントをつけて引用させていただきます
package main
func readFile(path string) chan string {
// ファイルを読み込み、その結果を返すFutureを返す
promise := make(chan string)
// readFile とは別のゴルーチンでファイルを読み出す
go func() {
content, err := os.ReadFile(path)
if err != nil {
fmt.Printf("read error %s\n", err.Error())
close(promise)
} else {
// 約束を果たした
promise <- string(content)
}
}()
return promise
}
func printFunc(futureSource chan string) chan []string {
// 文字列中の関数一覧を返すFutureを返す
promise := make(chan []string)
// printFunc とは別のゴルーチンで文字列操作する
go func() {
var result []string
// futureSource は readFile で読みだしたファイルの中身です
//
// readFile(ファイル読み込み)が完了して、 futureSource(=promise) に
// 中身が送信されるまでこの処理は実行されません
for _, line := range strings.Split(<-futureSource, "\n") {
if strings.HasPrefix(line, "func ") {
result = append(result, line)
}
}
// 約束を果たした
promise <- result
}()
return promise
}
func main() {
futureSource := readFile("future_promise.go")
// 一見、 readFile が実行されたあとに、すぐ printFunc が実行されるように見えます
// しかし、 printFunc の引数(futureSource)がチャネルになっているので、
// futureSourceが値を受信するまで関数内で、futureSource を使うことができない
//
// よって関数内で実行待ちが発生します
futureFuncs := printFunc(futureSource)
// チャネル(futureFuncs)を受信するまでブロック
fmt.Println(strings.Join(<-futureFuncs, "\n"))
}
Java で実現していた Future メソッドとは違い、Go ではゴルーチン、クロージャー、チャネルをつかって実行待ちを表現できるようです。
実際に使われている OSS を見てみた
そもそものきっかけは、『Go 言語による分散サービス』オンライン読書会に参加したときに、書籍のサンプルの中で使われていたことでした
その時のツイート
サンプルがつかっている OSS は raft
です
raft
は hashicorp 社が提供している Go のライブラリで、同じ名前のRaft
という分散合意形成アルゴリズムの実装です
Raft
についての説明は割愛しますが、以下の記事がわかりやすかったのでリンクさせていただきます
まずは、raft
を使っているソースコードです
このコードは、Go 言語による分散サービスのリポジトリのものです
future := l.raft.Apply(buf.Bytes(), timeout)
if future.Error() != nil {
return nil, future.Error()
}
res := future.Response()
if err, ok := res.(error); ok {
return nil, err
}
ログデータであるbuf.Byte()
をタイムアウト時間を設定して、分散システムの一貫性を保ちながら保存しています
注目するのは、返ってきた値をfuture
という変数で名前をつけて後続で処理しているところです。
これは、Apply
の直後のコードif future.Error() != nil
が実行されるタイミングで、まだfuture
に値が入ってない(Apply
が実行されていない)かもしれないということです。
Apply
の実装はこちらです
func (r *Raft) Apply(cmd []byte, timeout time.Duration) ApplyFuture {
return r.ApplyLog(Log{Data: cmd}, timeout)
}
ApplyFuture
を返していることがわかりますね
ここでFuture
という名前が使われていることからパターンに沿っていそうな雰囲気を感じます。
ApplyFuture
はインターフェースとして用意されています
便宜上、定義されている順番とコメントを省略してます
type ApplyFuture interface {
IndexFuture
Response() interface{}
}
type IndexFuture interface {
Future
Index() uint64
}
type Future interface {
Error() error
}
上記から読み取れることに、ApplyFuture
インターフェースに埋め込まれたインターフェースからError()
/Response()
/Index()
メソッドが実装されていることが条件ということがわかります
ここで、Apply
メソッド内で実行されているApplyLog
メソッドがApplyFuture
を返していることを確認します
ポイントは、ApplyLog
内のselect
の処理です
select {
case <-timer:
return errorFuture{ErrEnqueueTimeout}
case <-r.shutdownCh:
return errorFuture{ErrRaftShutdown}
case r.applyCh <- logFuture:
return logFuture
}
ざっくりと、チャネルによってタイムアウトした場合とシャットダウン状態になった場合はerrorFuture
、そしてlogFuture
を返していることがわかります
この2つの構造体はどちらもApplyFuture
を満たします。
かつ、この時点でチャネルの受信と送信待ちをしていることから非同期処理になっていることを確認できます。
logFuture
の実装を見てみます。
type logFuture struct {
deferError
log Log
response interface{}
dispatch time.Time
}
func (l *logFuture) Response() interface{} {
return l.response
}
func (l *logFuture) Index() uint64 {
return l.log.Index
}
func (d *deferError) Error() error {
if d.err != nil {
return d.err
}
if d.errCh == nil {
panic("waiting for response on nil channel")
}
select {
case d.err = <-d.errCh:
case <-d.ShutdownCh:
d.err = ErrRaftShutdown
}
return d.err
}
ここでError()
からわかるように、シャットダウンまたはerrCh
にエラーが入るまで待ちが発生することがわかります。
もう一度サンプルのコードを見てみます
future := l.raft.Apply(buf.Bytes(), timeout)
if future.Error() != nil {
return nil, future.Error()
}
以上を踏まえて以下の動作をすることがわかったと思います。
-
errorFuture
の場合、Apply
メソッドの中で待ちが発生する- タイムアウトする場合
- シャットダウンする場合
-
logFuture
の場合、Apply
メソッド内での送信待ち、もしくはError()
メソッドの中で待ちが発生する
おわりに
Future パターンを取り上げてみましたが、蓋を開けてみるとチャネルを使った並行プログラミングでよく目にするような処理に、”名前がついてたんだ〜!”と思う方もいるでしょう(私のことです)
パターンを知る → 使っている OSS を見にいく流れは体験としていいなと思ったので、来年も引き続きやっていきます