77
69

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

ゴルーチンと並行性パターン

Last updated at Posted at 2015-03-04

第37回勉強会(2014/08/09) - 長岡 IT開発者 勉強会(NDS)

@hayajo

Goにおける並行性基礎

ゴルーチン

並行性のための基本機能。スレッドのようなもので、とても軽量。

goステートメントで生成される。

  • 関数
go logging("Hello, %s\n", name)
  • メソッド
go logger.Printf("Hello, %s\n", name)
  • クロージャ
go func() { // HL1
  log.Printf("Hello, %s\n", name)
}() // HL1

ゴルーチン: Example

package main

import (
	"log"
	"os"
	"time"
)

func logging(fmt string, args ...interface{}) {
	log.Printf(fmt, args...)
}

func main() {
	name := "Gopher"

	go logging("Hello, %s\n", name)

	logger := log.New(os.Stdout, "", log.Ldate | log.Ltime)
	go logger.Printf("Hello, %s\n", name)

	go func() {
		log.Printf("Hello, %s\n", name)
	}()

	time.Sleep(100 * time.Millisecond)
}

同期

syncパッケージで提供される基本的な同期機能。

  • sync.Mutex (ミューテックス)
  • sync.Once (pthread_once(3))
  • sync.WaitGroup (カウンタセマフォ)
  • sync.Pool (フリーリスト)

同期: Example - sync.WaitGroup

package main

import (
	"fmt"
	"sync"
)

func main() {
	name := "Gopher"

	var wg sync.WaitGroup

	wg.Add(1)

	go func() {
		defer wg.Done()
		fmt.Printf("Hello, %s\n", name)
	}()

	wg.Wait()
}

チャネル

ゴルーチン間の通信を提供。双方向パイプに近い。メッセージ型を指定して利用する。

doneCh := make(chan struct{})

送受信はバッファサイズ分でブロックされる。(デフォルトはバッファなし)

go func() {
  fmt.Printf("Hello, %s\n", name)
  doneCh <- struct{}{} // blocking until ready to read
}()

<-doneCh // blocking until a value is received

引数や戻り値にも使用できる。

チャネル: Example

package main

import "fmt"

func main() {
	name := "Gopher"

	doneCh := make(chan struct{})

	go func() {
		fmt.Printf("Hello, %s\n", name)
		doneCh <- struct{}{} // blocking until ready to read
	}()

	<-doneCh // blocking until a value is received
}

並行性パターン

Bockground jobs

package main

import (
	"fmt"
	"strings"
	"sync"
)

func main() {
	var wg sync.WaitGroup

	args := []string{"foo", "bar", "baz"}

	for _, a := range args {
		wg.Add(1)
		go func(str string) {
			fmt.Println(strings.ToUpper(str))
			wg.Done()
		}(a)
	}

	wg.Wait() // waiting for all goroutines to finish
}

Unbuffered channel

package main

import (
	"fmt"
	"math/rand"
	"time"
)

func init() {
	rand.Seed(time.Now().UnixNano())
}

func from(conCh chan<- int) {
	conCh <- rand.Intn(100) // blocking until ready to read
}

func to(conCh <-chan int) {
	fmt.Printf("Received %d\n", <-conCh) // blocking until a value is received
}

func main() {
	conCh := make(chan int)
	go from(conCh)
	go to(conCh)
	time.Sleep(100 * time.Millisecond)
}

Buffered channel

package main

import (
	"log"
	"math/rand"
	"time"
)

func main() {
	limCh := make(chan struct{}, 3)

	for i := 0; i < 10; i++ {
		limCh <- struct{}{}

		go func(i int) {
			defer func() { <-limCh }()
			log.Println("BEGIN", i)
			time.Sleep(time.Duration(rand.Intn(5)) * time.Second)
			log.Println("END", i)
		}(i)
	}

	for i := 0; i < cap(limCh); i++ {
		limCh <- struct{}{}
	}
}

Reading from multiple channels - select

package main

import "log"

func main() {
	cntCh := make(chan int)
	sumCh := make(chan int)

	go func() {
		var s int
		for i := 0; i < 1000; i++ {
			cntCh <- i
			s += i
		}
		sumCh <- s
	}()

loop:
	for {
		select {
		case c := <-cntCh:
			log.Println(c)
		case s := <-sumCh:
			log.Println("SUM", s)
			break loop
		}
	}
}

Consume all values from a channel - range

package main

import (
	"log"
	"time"
)

func main() {
	ch := make(chan int)

	go func() {
		defer func() {
			close(ch)
		}()

		for i := 0; i < 10; i++ {
			ch <- i
			time.Sleep(time.Second)
		}
	}()

	for v := range ch {
		log.Println(v)
	}
}

Signaling

package main

import (
	"log"
	"sync"
)

func main() {
	var wg sync.WaitGroup

	startCh := make(chan struct{})

	for i := 0; i < 1000; i++ {
		wg.Add(1)

		go func(i int) {
			<-startCh // blocking until the channel is closed
			log.Println(i)
			wg.Done()
		}(i)
	}

	close(startCh) // closing the channel in order to start goroutines

	wg.Wait()
}

NOTE:

チャネルからの読み込みは値とチャネルの状態を返す。

value, isOpen := <- channel

チャネルが閉じられている場合、値はゼロ値、状態はfalseとなり読み込みはブロックされない。

Safe counter

package main

import (
	"log"
	"sync"
)

func counter(ch chan<- int64) {
	go func() {
		var c int64 = 1
		for {
			ch <- c // HL
			c++
		}
	}()
}

func main() {
	var wg sync.WaitGroup

	cntCh := make(chan int64)

	counter(cntCh)

	start := make(chan struct{})

	for i := 0; i < 1000; i++ {
		wg.Add(1)

		go func(i int) {
			<-start
			log.Printf("#%03d: % 4d\n", i, <-cntCh)
			wg.Done()
		}(i)
	}

	close(start)

	wg.Wait()
}

NOTE:

単純なカウンタであれば sync.atomic パッケージでも実現可能。

Timer

package main

import (
	"container/ring"
	"fmt"
	"log"
	"time"
)

var aa []string = []string{
	"ヽ(゚∀゚)ノ",
	"( ゚∀)ノ",
	"(  ゚)ノ",
	"ヽ(  )ノ",
	"ヽ(゚  )",
	"ヽ(∀゚ )ノ",
}

func main() {
	ch := make(chan string)

	go func() {
		r := ring.New(len(aa))

		for _, s := range aa {
			r.Value = s
			r = r.Next()
		}

		for {
			r = r.Next()
			ch <- fmt.Sprintf("%v", r.Value)
			time.Sleep(100 * time.Millisecond)
		}
	}()

	timeoutCh := time.After(5 * time.Second)

loop:
	for {
		select {
		case v := <-ch:
			fmt.Print("\x0c") // New Page
			fmt.Println(v)
		case <-timeoutCh:
			log.Println("timed out")
			break loop
		}
	}
}

NOTE:

  • time.After
  • time.Tick
  • time.Sleep

Fan-out

処理を分散させる

package main

import (
	"log"
	"sync"
)

func main() {
	outCh := make(chan byte)

	go func() {
		defer func() {
			close(outCh)
		}()

		for i := 0; i < 26; i++ {
			outCh <- byte(65 + i) // A..Z
		}
	}()

	var wg sync.WaitGroup

	startCh := make(chan struct{})

	for i := 0; i < 3; i++ {
		wg.Add(1)

		go func(i int) {
			<-startCh
			for b := range outCh {
				log.Printf("#%d %s", i, string(b))
			}
			wg.Done()
		}(i)
	}

	close(startCh)

	wg.Wait()
}

Fan-in

チャネルを多重化する

package main

import (
	"bytes"
	"log"
	"sync"
)

func fanIn(ch ...<-chan byte) <-chan byte {

	var wg sync.WaitGroup

	outCh := make(chan byte)

	for _, c := range ch {
		wg.Add(1)

		go func(c <-chan byte) {
			for b := range c {
				outCh <- b
			}
			wg.Done()
		}(c)
	}

	go func() {
		wg.Wait()
		close(outCh)
	}()
	return outCh
}

func main() {
	ch1 := make(chan byte)
	ch2 := make(chan byte)

	go func() {
		defer func() {
			close(ch1)
			close(ch2)
		}()
		for i := 0; i < 26; i++ {
			ch1 <- byte(65 + i) // A..Z
			ch2 <- byte(97 + i) // a..z
		}
	}()

	var b bytes.Buffer

	outCh := fanIn(ch1, ch2)

	for v := range outCh {
		b.WriteByte(v)
	}

	log.Println(b.String())
}

Coroutine

コルーチン - Wikipedia

package main

import "log"

func hello(yield chan<- string) {
	go func() {
		yield <- "Hello"
		yield <- "Gopher"
		yield <- "Hello NDS"
	}()
}

func main() {
	co := make(chan string)

	hello(co)

	log.Println(<-co, "World")
	log.Println("Hello", <-co)
	log.Println(<-co)
}

Generator

ジェネレータ (プログラミング) - Wikipedia

package main

import (
	"log"
	"math/rand"
	"time"
)

func init() {
	rand.Seed(time.Now().UnixNano())
}

func generator(n int) <-chan int {
	yield := make(chan int)

	go func() {
		for i := 0; i < n; i++ {
			yield <- rand.Int()
		}
		close(yield)
	}()

	return yield
}

func main() {
	for v := range generator(100) {
		log.Println(v)
	}
}

Future

future - Wikipedia

package main

import (
	"fmt"
	"log"
	"strconv"
	"sync"
)

type futureUint64 struct {
	value uint64
	ch    chan uint64
	once  sync.Once
}

func (f *futureUint64) String() string {
	f.once.Do(func() { f.value = <-f.ch })
	return strconv.FormatUint(f.value, 10)
}

func Fibonacci(n uint64) fmt.Stringer {
	f := new(futureUint64)
	f.ch = make(chan uint64)
	go func() {
		f.ch <- fib(n)
	}()
	return f
}

func fib(n uint64) uint64 {
	switch n {
	case 0:
		return 0
	case 1, 2:
		return 1
	default:
		return fib(n-1) + fib(n-2)
	}
}

func main() {
	f := Fibonacci(20)
	log.Println("The 20th Fibonacci number is ", f)
}

Actor model

アクターモデル - Wikipedia

package main

import "log"

type actor chan<- interface{}

type PutReq struct {
	key, val string
}

type GetReq struct {
	key string
	ack chan<- string
}

func NewMapActor() actor {
	a := make(chan interface{})

	go func() {
		m := make(map[string]string)

		for {
			switch r := (<-a).(type) {
			case PutReq: // HL
				m[r.key] = r.val
			case GetReq: // HL
				r.ack <- m[r.key]
			}
		}
	}()

	return actor(a)
}

func main() {
	a := NewMapActor()

	a <- PutReq{key: "hoge", val: "HOGEHOGE"}

	ack := make(chan string)
	a <- GetReq{key: "hoge", ack: ack}
	log.Println(<-ack)
}

Appendix

並列度をあげる

デフォルトは1。変更は runtime.GOMAXPROCS(int) で行う。

runtime.GOMAXPROCS(2)

CPUコア数は runtime.NumCPU() で取得する。

runtime.GOMAXPROCS(runtime.NumCPU())

現在のゴルーチン数を取得

package main

import (
	"log"
	"runtime"
)

func main() {
	log.Printf("running %d goroutines", runtime.NumGoroutine())
}
  • 現在のメモリ情報を取得
package main

import (
	"encoding/json"
	"fmt"
	"runtime"
)

func main() {
	var m runtime.MemStats
	runtime.ReadMemStats(&m)
	b, _ := json.MarshalIndent(m, "", "  ")
	fmt.Println(string(b))
}
77
69
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
77
69

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?