LoginSignup
8
5

More than 3 years have passed since last update.

Goでパイプライン処理っぽいことをする

Last updated at Posted at 2015-04-04

マルチコア時代のコンピューティングが云々。という感じで並列処理を作成する時、シェルのパイプライン処理はとても記述がわかりやすくて好きです。

seq(10) | fizzbuzz | stdout

というような記述で、seq fizzbuzz stdoutの3つのプログラムが別スレッドとして立ち上がり、seqから標準出力があると、それをfizzbuzzが受け取って処理をしてさらに下流の処理に受け渡す…という動きをします。
別スレッド! Go言語が得意そうです。
似たようなことができないか実装してみました。

始めに言っておきますが、この記事は

あたりをパクっているリスペクトしています。

tl;dr

  • Goでパイプライン処理っぽいことが出来ないか手を動かしながら考えてみたよ。
  • usa っていうパッケージにまとめてみたよ。

素直な実装

以下のような処理をひたすらつなげることでパイプライン処理を作成できます。

func hoge(in<- chan T1) <-chan T2 {
  out := make(chan T2)
  go func() {
    for {
      if val, ok := <-in; ok {
        // 上流の処理から入力があった場合の処理
      } else {
        // 上流の処理から入力が終了している場合の処理
      }
    }
    // 処理が終わったので下流へのデータ受け渡しchannelを閉じる
    close(out)
  }()
  return out
}

// 上流からの入力終了時に何の処理も必要ないなら、以下のように書くことも可能
func hoge(in<- chan T1) <-chan T2 {
  out := make(chan T2)
  go func() {
    for val := range out{
      // 上流の処理から入力があった場合の処理
    }
    // 処理が終わったので下流へのデータ受け渡しchannelを閉じる
    close(out)
  }()
  return out
}

具体的には以下のようになるはずです。
処理の開始には上流からのデータ渡し(in)がなく、最後の処理には下流へのデータ渡し(out)が不要なので少し細工が必要です。
今回は最後の処理のoutには、プログラム内で使うことはありませんがboolを便宜上渡すことにしました。

pipe-v1.go
package main

import (
  "fmt"
)

func main() {
  pipe1 := seq(15)
  pipe2 := fizzbuzz(pipe1)
  pipe3 := stdout(pipe2)
  // または
  // pipe3 := stdout(fizzbuzz(seq(15)))

  for {
    if _, ok := <-pipe3; !ok {
      // chan `pipe3` is close
      break
    }
  }
}

func seq(i int) chan int {
  out := make(chan int)
  go func() {
    for n := 0; n < i; n++ {
      out <- n + 1
    }
    close(out)
  }()
  return out
}

func fizzbuzz(in chan int) chan string {
  out := make(chan string)
  go func() {
    var str string
    for val := range in {
      if val%15 == 0 {
        str = "FizzBuzz"
      } else if val%3 == 0 {
        str = "Fizz"
      } else if val%5 == 0 {
        str = "Buzz"
      } else {
        str = fmt.Sprint(val)
      }
      out <- str
    }
    close(out)
  }()
  return out
}

func stdout(in chan string) chan bool {
  out := make(chan bool)
  go func() {
    for val := range in {
      fmt.Println(val)
      out <- true
    }
    close(out)
  }()
  return out
}

重複部分をまとめる

毎度毎度goroutineを作っている部分は冗長です。とくにfizzbizz stdoutは両方共、引数・戻り値の型以外はほとんど構造的にかわりません。
ということで、最上流以外の関数をある程度まとめてみます。

pipe-v2.go
package main
import (
  "fmt"
  "reflect"
)

func pipe(in <-chan interface{}, f interface{}) <-chan interface{} {
  out := make(chan interface{})
  fv := reflect.ValueOf(f)
  if fv.Kind() != reflect.Func {
    // fが関数でない場合はchannelを閉じて終了
    panic("get none functional argument.")
    close(out)
    return out
  }

  go func() {
    // 上流からの入力をfで処理して下流に渡す
    for val := range in {
      arg := reflect.ValueOf(val)
      ret := fv.Call([]reflect.Value{arg})[0]
      out <- ret.Interface()
    }
    close(out)
  }()
  return out
}

func main() {
  var pipe1 chan interface{}
  pipe1 = seq(15)
  pipe2 := pipe(pipe1, fizzbuzz)
  pipe3 := pipe(pipe2, stdout)

  for {
    if _, ok := <-pipe3; !ok {
      break
    }
  }
}

func seq(i int) chan interface{} {
  out := make(chan interface{})
  go func() {
    for n := 0; n < i; n++ {
      out <- n + 1
    }
    close(out)
  }()
  return out
}

func fizzbuzz(val int) string {
  var str string
  if val%15 == 0 {
    str = "FizzBuzz"
  } else if val%3 == 0 {
    str = "Fizz"
  } else if val%5 == 0 {
    str = "Buzz"
  } else {
    str = fmt.Sprint(val)
  }
  return str
}

func stdout(val string) bool {
  fmt.Println(val)
  return true
}

発展させる

さて、なんとなくそれっぽいものを作ったのですが、ふと我に返るとこれって役に立つのでしょうか。
関数f1の結果をf2で処理して、その結果をさらにf3で処理する…という流れならば、単純に関数合成的なアプローチをとったほうが良さそうな気がします。

gof.go
for i := 1 ; 1 <= 10; 1++ {
  stdout(fizzbuzz(i))
}

便利に使うには以下のような機能が必要な気がしてきます。

  • 最下流の処理が終わった後の出力で、チャネルが出力されるとか、最終的な結果の配列がもどるとか、いろいろしたい。
  • 配列を渡して、途中のパイプラインでごにょごにょできて、最後配列で欲しい。でも最後channelで受け取りたい時もありそう。
  • 途中の処理で不要なデータを除外するフィルタリング機能的なこともできたらいい。
  • 並び替えのように、上流からきたデータ全体に対しての処理もできたらいい。

と、こんな感じのわがままボディで、ふわふわしてて、ぴょんぴょんしてる、パイプ掘り掘りパッケージを作ってみました。
上記の特徴から、「うさぎかな?」ということで usa という名前です。合衆国は関係ありません。

「1〜15の数字を生成する」「奇数だけに絞り込む」「FizzBuzz」「出力」という処理を行いたいときに

usa.Filter(seq(15))
   .Filter(odd)
   .Pipe(fizzbuzz)
   .Pipe(stdout)
   .Run().Wait() // パイプラインを実行し、処理が完了するまで待つ

というふうに書けます。
具体的には以下の様に書けるのですが。。。なんだか長くなって、あまり有り難みがない。

// 最上流処理は、インプットがないので第一引数を受け取らないよにする
// ついでにカリー化して、seq(15)とかseq(20)とか色々変えやすいようにする
seq := func(i int) func(usa.Ch,usa.Ch){
  return func(_,out usa.Ch){
    for n < 1; n <= i; n++{
      out <- n
    }
    close(out)
  }
}

// 中間処理でのフィルタ
odd := func(in, out usa.Ch) {
  for v := range in{
    n, ok = v.(int) 
    if ok && n%2 == 1{
      out <- n
    }
  }
  close(out)
}

// 上流からきたデータを必ず1つのデータで下流に流す場合は
fizzbuzz = func(i int) string{
  var str string
  if val%15 == 0 {
    str = "FizzBuzz"
  } else if val%3 == 0 {
    str = "Fizz"
  } else if val%5 == 0 {
    str = "Buzz"
  } else {
    str = fmt.Sprint(val)
  }
  return str
}

// true を返すのはダミー
stdout := func(s string) bool{
  fmt.Println(s)
  return true
}

// パイプラインっぽい処理
usa.Filter(seq(15)).Filter(odd).Pipe(fizzbuzz).Pipe(stdout).Run().Wait()
//=> 1
//   Fizz
//   Buzz
//   Fizz
//   7
//   Fizz
//   11
//   13
//   FizzBuzz

// 最終結果をチェネルで取得
u := usa.Filter(seq(15)).Filter(odd).Pipe(fizzbuzz).Pipe(stdout).Run()
for uv := range u{
  fmt.Println(uv)
}
//=> true
//   true
//   :

// Arrayで結果を取得
usa.Filter(seq(15)).Filter(odd).Pipe(fizzbuzz).Run().ToArray()
//=> [1 Fizz Buzz Fizz 7 Fizz 11 13 FizzBuzz]

参考

8
5
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
8
5