概要
たまに go で書く時のためにチャネルの定番パターンをメモする.
go は簡単なツールを作成するために使う程度のものなので、確実な動作は保証しないし、context 終了時の動作はあまり精査しきれていない。
- FanIn
- FanOut
- (opt) Transform
FanIn
複数のチャネルを一つのチャネルにまとめて受信できるようにする
// 複数チャネルから受信した値を単一のチャネルにまとめて受信できるようにする
func FanIn[T any](ctx context.Context, in ...<-chan T) <-chan T {
ch := make(chan T)
// 複数チャネルを一つのチャネルにまとめるグルーチン作成
var wg sync.WaitGroup
for c := range slices.Values(in) {
wg.Add(1)
go func() {
defer wg.Done()
for v := range c {
select {
case ch <- v:
case <-ctx.Done():
return
}
}
}()
}
// ソースとなるチャネルがすべてクローズしたら FanIn チャネルを閉じる
go func() {
wg.Wait()
close(ch)
}()
return ch
}
FanOut
一つのチャネルで受け取った値を複数のチャネルで受け取れるようにする
// 一つのチャネルから受信した値を複数のチャネルで受信できるようにする
func FanOut[T any](ctx context.Context, out <-chan T, split uint8) []<-chan T {
ch := make([]chan T, split)
for i := range split {
ch[i] = make(chan T)
}
// 一つのチャネルで受け取った値を複数のチャネルで受信できるようにするゴルーチン
go func() {
// ソースとなるチャネルが閉じた or コンテキストが終了したため Fanout チャネルを閉じる
defer func() {
for c := range slices.Values(ch) {
close(c)
}
}()
// Fanout チャネルに送信する
for v := range out {
for c := range slices.Values(ch) {
select {
case c <- v:
case <-ctx.Done():
return
}
}
}
}()
// 受信専用チャネルにして返す
res := make([]<-chan T, split)
for i := range split {
res[i] = ch[i]
}
return res
}
Transform
チャネルから受信した値を別の型に変換して別のチャネルで受信できるようにする
// / チャネルから受信した値を別の型に変換して別のチャネルに送信する
func Transform[S any, T any](ctx context.Context, in <-chan S, transform func(S) T) <-chan T {
ch := make(chan T)
go func() {
defer close(ch)
for v := range in {
select {
case ch <- transform(v):
case <-ctx.Done():
return
}
}
}()
return ch
}