LoginSignup
2
3

More than 1 year has passed since last update.

【Go/並行処理】Futureパターンってなにか調べてみた!

Last updated at Posted at 2022-12-09

はじめに

こんにちは!@uh-zzです!

この記事は、Go Advent Calendar 2022の 10 日目の記事になります!

今年は、個人的に色々なことに挑戦した年だったなあと振り返るとともに、去年のアドベントカレンダーからもう1年経つのか〜という気持ちです

(去年もパターンについて書いてました笑)

この記事では、Go における Future パターンの紹介と使わている OSS を見ていきたいと思います

いったんCM

社のアドベントカレンダーも絶賛開催中です!

想定読者

  • Go でプログラミングを始めた人
  • ゴリゴリ Go を書いている人
  • 他言語で並行プログラミングしてる人

Future パターンとは

あるメソッドを呼び出すとします。 もしもオブジェクトが、そのメソッドを実行できる状態なら、実行します。 でも、実行できない状態なら、将来実行できる状態になるまで待つようにしたいとします。 その時に使えるのがこの Future パターンです。 future は「未来」という意味です

もう少し正確にお話しましょう。 単にあるクラスに 「実行できる状態になるまで待つ」 という機能を入れるわけではありません。 すでに存在しているクラスに一皮かぶせて、 「実行できる状態になるまで待てるような機能を追加する」 というのが Future パターンです。

出典: 結城浩, Future パターン, デザインパターン紹介

上記の参考記事内では、Java をつかったマルチスレッドプログラミングで Future パターンが実装されています。

引用箇所の説明がほぼすべてですが、イメージ図で補足するとこんな感じになります

呼び出し元と処理するメソッドの間に Future メソッドを挟むことで、Future メソッドがプロキシ的に働き、非同期的に処理するメソッドを実行できるようになっています。

Go だとこんなかんじにかけるらしい

以下の記事で Future/Promiseという説明がされています

※記事内にあるコードに、コメントをつけて引用させていただきます:pray:

package main

func readFile(path string) chan string {
  // ファイルを読み込み、その結果を返すFutureを返す
  promise := make(chan string)

	// readFile とは別のゴルーチンでファイルを読み出す
  go func() {
    content, err := os.ReadFile(path)
    if err != nil {
      fmt.Printf("read error %s\n", err.Error())
      close(promise)
    } else {
      // 約束を果たした
      promise <- string(content)
    }
  }()
  return promise
}

func printFunc(futureSource chan string) chan []string {
  // 文字列中の関数一覧を返すFutureを返す
  promise := make(chan []string)

	// printFunc とは別のゴルーチンで文字列操作する
  go func() {
    var result []string

		// futureSource は readFile で読みだしたファイルの中身です
		//
		// readFile(ファイル読み込み)が完了して、 futureSource(=promise) に
		// 中身が送信されるまでこの処理は実行されません
    for _, line := range strings.Split(<-futureSource, "\n") {
      if strings.HasPrefix(line, "func ") {
        result = append(result, line)
      }
    }
    // 約束を果たした
    promise <- result
  }()
  return promise
}

func main() {
  futureSource := readFile("future_promise.go")

	// 一見、 readFile が実行されたあとに、すぐ printFunc が実行されるように見えます
	// しかし、 printFunc の引数(futureSource)がチャネルになっているので、
	// futureSourceが値を受信するまで関数内で、futureSource を使うことができない
	//
	// よって関数内で実行待ちが発生します
  futureFuncs := printFunc(futureSource)

	// チャネル(futureFuncs)を受信するまでブロック
  fmt.Println(strings.Join(<-futureFuncs, "\n"))
}

Java で実現していた Future メソッドとは違い、Go ではゴルーチン、クロージャー、チャネルをつかって実行待ちを表現できるようです。

実際に使われている OSS を見てみた

そもそものきっかけは、『Go 言語による分散サービス』オンライン読書会に参加したときに、書籍のサンプルの中で使われていたことでした

その時のツイート

サンプルがつかっている OSS は raft です

raftは hashicorp 社が提供している Go のライブラリで、同じ名前のRaftという分散合意形成アルゴリズムの実装です

Raftについての説明は割愛しますが、以下の記事がわかりやすかったのでリンクさせていただきます

まずは、raftを使っているソースコードです
このコードは、Go 言語による分散サービスのリポジトリのものです

	future := l.raft.Apply(buf.Bytes(), timeout)
	if future.Error() != nil {
		return nil, future.Error()
	}
	res := future.Response()
	if err, ok := res.(error); ok {
		return nil, err
	}

ログデータであるbuf.Byte()をタイムアウト時間を設定して、分散システムの一貫性を保ちながら保存しています
注目するのは、返ってきた値をfutureという変数で名前をつけて後続で処理しているところです。

これは、Applyの直後のコードif future.Error() != nilが実行されるタイミングで、まだfutureに値が入ってない(Applyが実行されていない)かもしれないということです。

Applyの実装はこちらです

func (r *Raft) Apply(cmd []byte, timeout time.Duration) ApplyFuture {
	return r.ApplyLog(Log{Data: cmd}, timeout)
}

ApplyFutureを返していることがわかりますね
ここでFutureという名前が使われていることからパターンに沿っていそうな雰囲気を感じます。

ApplyFutureはインターフェースとして用意されています

便宜上、定義されている順番とコメントを省略してます

type ApplyFuture interface {
	IndexFuture

	Response() interface{}
}

type IndexFuture interface {
	Future

	Index() uint64
}

type Future interface {
	Error() error
}

上記から読み取れることに、ApplyFutureインターフェースに埋め込まれたインターフェースからError()/Response()/Index()メソッドが実装されていることが条件ということがわかります

ここで、Applyメソッド内で実行されているApplyLogメソッドがApplyFutureを返していることを確認します
ポイントは、ApplyLog内のselectの処理です

	select {
	case <-timer:
		return errorFuture{ErrEnqueueTimeout}
	case <-r.shutdownCh:
		return errorFuture{ErrRaftShutdown}
	case r.applyCh <- logFuture:
		return logFuture
	}

ざっくりと、チャネルによってタイムアウトした場合とシャットダウン状態になった場合はerrorFuture、そしてlogFutureを返していることがわかります

この2つの構造体はどちらもApplyFutureを満たします。
かつ、この時点でチャネルの受信と送信待ちをしていることから非同期処理になっていることを確認できます。

logFutureの実装を見てみます。

type logFuture struct {
	deferError
	log      Log
	response interface{}
	dispatch time.Time
}

func (l *logFuture) Response() interface{} {
	return l.response
}

func (l *logFuture) Index() uint64 {
	return l.log.Index
}

func (d *deferError) Error() error {
	if d.err != nil {
		return d.err
	}
	if d.errCh == nil {
		panic("waiting for response on nil channel")
	}
	select {
	case d.err = <-d.errCh:
	case <-d.ShutdownCh:
		d.err = ErrRaftShutdown
	}
	return d.err
}

ここでError()からわかるように、シャットダウンまたはerrChにエラーが入るまで待ちが発生することがわかります。

もう一度サンプルのコードを見てみます

	future := l.raft.Apply(buf.Bytes(), timeout)
	if future.Error() != nil {
		return nil, future.Error()
	}

以上を踏まえて以下の動作をすることがわかったと思います。

  • errorFutureの場合、Applyメソッドの中で待ちが発生する
    • タイムアウトする場合
    • シャットダウンする場合
  • logFutureの場合、Applyメソッド内での送信待ち、もしくはError()メソッドの中で待ちが発生する

おわりに

Future パターンを取り上げてみましたが、蓋を開けてみるとチャネルを使った並行プログラミングでよく目にするような処理に、”名前がついてたんだ〜!”と思う方もいるでしょう(私のことです)

パターンを知る → 使っている OSS を見にいく流れは体験としていいなと思ったので、来年も引き続きやっていきます:wave:

2
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
3