part2の続き
goroutine
goroutine: 並列処理。
go 処理 とかく。処理が終わらなくても、プログラムのコードは終了する。
対策①sync.WaitGroupを使う!
package main
import (
"fmt"
"sync"
)
func goroutine(s string, wg *sync.WaitGroup) {
defer wg.Done() // 並列処理が終了したと伝える
for i := 0; i < 5; i++ {
fmt.Println(s)
}
}
func normal(s string) {
for i := 0; i < 5; i++ {
fmt.Println(s)
}
}
func main() {
var wg sync.WaitGroup
wg.Add(1) // 処理するべき並列処理を1つ追加する
go goroutine("world", &wg) // goroutine 並列処理
normal("hello")
wg.Wait() // wg.Addで追加した分,wg.Done()されるまで待機。(コードを終わらせない)
}
対策②channelを使う!
チャネルを受信するまで、受信する行でコードが止まるから。
package main
import "fmt"
func goroutine(s []int, c chan int) {
sum := 0
for _, v := range s {
sum += v
}
c <- sum // sumをc(チャネル)に送信
}
func main() {
s1 := []int{1, 2, 3, 4, 5}
s2 := []int{11, 234, 324, 89382}
c := make(chan int) // キューのように送られてきたものから順次入っていき、出ていく。 ex. 89951 15
go goroutine(s1, c)
go goroutine(s2, c)
x := <-c // cに何か入ってくるまでコードがここで止まる!!
fmt.Println("x:", x)
y := <-c
fmt.Println("y:", y)
}
buffered channel
channelの中に入ってくるデータ(buffer)の個数を指定したチャネル。
ch := make(chan int, 2) // 最大2個までしか受け入れない!
ch <- 100
fmt.Println(len(ch)) // 1
ch <- 200
fmt.Println(len(ch)) // 2
channelのrangeとclose
rangeで取り出すときは必ずclose()を一緒に使う!
package main
import "fmt"
func goroutine(s []int, c chan int) {
sum := 0
for _, v := range s{
sum += v
c <- sum
}
close(c) // rangeで取り出す場合には、close()でもうこれ以上データが入ってこないと伝える必要がある!
}
func main() {
s := []int{1, 2, 3, 4, 5}
c := make(chan int, len(s))
go goroutine(s, c)
// 受信したデータを順次forループで取り出していく
for i := range c{
fmt.Println(i)
}
}
producerとconsumer
package main
import (
"fmt"
"sync"
)
func producer(ch chan int, i int) {
// Something ex.どこかのサーバーに行ってログを取ってくる
ch <- i * 2
}
func consumer(ch chan int, wg *sync.WaitGroup) {
for i := range ch {
func() {
defer wg.Done()
// 取ってきたデータをもとになにか処理する。 ex.取ってきたログをsaveする
fmt.Println("process", i*1000)
}()
}
}
func main() {
var wg sync.WaitGroup
ch := make(chan int)
// Producer 複数のgoroutineでなにか処理(データを取ってきたり)する
for i := 0; i < 10; i++ {
wg.Add(1)
go producer(ch, i)
}
// Consumer Producerで取ってきたデータを順次処理する。
go consumer(ch, &wg)
wg.Wait()
close(ch)
}
fan-out fan-in
並列処理を複数段階に分ける! 例えば役割ごとに処理を分けるなど。
package main
import "fmt"
func producer(first chan int) {
defer close(first)
for i := 0; i < 10; i++ {
first <- i
}
}
func multi2(first <-chan int, second chan<- int) {
defer close(second)
for i := range first {
second <- i * 2
}
}
func multi4(second <-chan int, third chan<- int) {
defer close(third)
for i := range second {
third <- i * 4
}
}
func main() {
first := make(chan int)
second := make(chan int)
third := make(chan int)
go producer(first)
go multi2(first, second)
go multi4(second, third)
for result := range third{
fmt.Println(result)
}
}
channelとselect
違う並列処理を複数走らせて、お互いにブロッキングしないようにするときに使う。
package main
import (
"fmt"
"math/rand"
"time"
)
func goroutine1(ch chan string) {
for {
// ex.ネットワーク越しにパケットを受信するイメージ
ch <- "packet from 1"
time.Sleep(3 * time.Second)
}
}
func goroutine2(ch chan int) {
for {
ch <- rand.Intn(100)
time.Sleep(1 * time.Second)
}
}
func main() {
c1 := make(chan string)
c2 := make(chan int)
go goroutine1(c1)
go goroutine2(c2)
// 違う並列処理を複数走らせて、お互いにブロッキングしないようにするときに使う
for {
select {
case msg := <-c1:
fmt.Println(msg)
case num := <-c2:
fmt.Println(num)
}
}
}
Default Selection: チャネルから受信してないときの処理
for selectから抜けるときは下記のOuterLoop(名前は何でも良い)のようにする。
package main
import (
"fmt"
"time"
)
func main() {
tick := time.Tick(100 * time.Millisecond)
boom := time.After(500 * time.Millisecond)
OuterLoop:
for {
select {
case <-tick:
fmt.Println("tick.")
case <-boom:
fmt.Println("BOOM!")
break OuterLoop // for selectから抜けるときに使う!名前は何でも良い。
default: // 受信してないときはdefaultの処理が行われる
fmt.Println(" .")
time.Sleep(50 * time.Millisecond)
}
}
fmt.Println("##############")
}
sync.Mutex
下のようなコードだと、時々並列処理の書き換えがコンフリクトしてエラーが出る。
package main
import (
"fmt"
"time"
)
func main() {
c := make(map[string]int)
go func() {
for i := 0; i < 10; i++ {
c["key"] += 1
}
}()
go func() {
for i := 0; i < 10; i++ {
c["key"] += 1
}
}()
time.Sleep(1 * time.Second)
fmt.Println(c, c["key"])
}
そこでsync.Mutexを以下のように使う!
package main
import (
"fmt"
"sync"
"time"
)
type Counter struct {
v map[string]int
mux sync.Mutex
}
func (c *Counter) Increment(key string) {
c.mux.Lock()
defer c.mux.Unlock()
c.v[key]++
}
func (c *Counter) Value(key string) int {
c.mux.Lock()
defer c.mux.Unlock()
return c.v[key]
}
func main() {
//c := make(map[string]int)
c := Counter{v: make(map[string]int)}
go func() {
for i := 0; i < 10; i++ {
//c["key"] += 1
c.Increment("Key")
}
}()
go func() {
for i := 0; i < 10; i++ {
c.Increment("Key")
}
}()
time.Sleep(1 * time.Second)
//fmt.Println(c, c["key"])
fmt.Println(c, c.Value("Key"))
}