0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

Golang 基礎(並列処理) part3

Last updated at Posted at 2020-08-11

part2の続き

goroutine

goroutine: 並列処理。
go 処理 とかく。処理が終わらなくても、プログラムのコードは終了する。
対策①sync.WaitGroupを使う!

package main

import (
	"fmt"
	"sync"
)

func goroutine(s string, wg *sync.WaitGroup) {
	defer wg.Done() // 並列処理が終了したと伝える
	for i := 0; i < 5; i++ {
		fmt.Println(s)
	}
}

func normal(s string) {
	for i := 0; i < 5; i++ {
		fmt.Println(s)
	}
}

func main() {
	var wg sync.WaitGroup
	wg.Add(1) // 処理するべき並列処理を1つ追加する

	go goroutine("world", &wg) // goroutine 並列処理
	normal("hello")

	wg.Wait() // wg.Addで追加した分,wg.Done()されるまで待機。(コードを終わらせない)
}

対策②channelを使う!
チャネルを受信するまで、受信する行でコードが止まるから。

package main

import "fmt"

func goroutine(s []int, c chan int) {
	sum := 0
	for _, v := range s {
		sum += v
	}
	c <- sum // sumをc(チャネル)に送信
}

func main() {
	s1 := []int{1, 2, 3, 4, 5}
	s2 := []int{11, 234, 324, 89382}
	c := make(chan int) // キューのように送られてきたものから順次入っていき、出ていく。 ex. 89951 15 

	go goroutine(s1, c)
	go goroutine(s2, c)
	x := <-c // cに何か入ってくるまでコードがここで止まる!!
	fmt.Println("x:", x)
	y := <-c
	fmt.Println("y:", y)
}

buffered channel

channelの中に入ってくるデータ(buffer)の個数を指定したチャネル。

ch := make(chan int, 2) // 最大2個までしか受け入れない!
ch <- 100
fmt.Println(len(ch)) // 1
ch <- 200
fmt.Println(len(ch)) // 2

channelのrangeとclose

rangeで取り出すときは必ずclose()を一緒に使う!

package main

import "fmt"

func goroutine(s []int, c chan int)  {
	sum := 0
	for _, v := range s{
		sum += v
		c <- sum
	}
	close(c) // rangeで取り出す場合には、close()でもうこれ以上データが入ってこないと伝える必要がある!
}

func main() {
	s := []int{1, 2, 3, 4, 5}
	c := make(chan int, len(s))

	go goroutine(s, c)
	// 受信したデータを順次forループで取り出していく
	for i := range c{
		fmt.Println(i)
	}
}

producerとconsumer

package main

import (
	"fmt"
	"sync"
)

func producer(ch chan int, i int) {
	// Something ex.どこかのサーバーに行ってログを取ってくる
	ch <- i * 2
}

func consumer(ch chan int, wg *sync.WaitGroup) {
	for i := range ch {
		func() {
			defer wg.Done()
			// 取ってきたデータをもとになにか処理する。 ex.取ってきたログをsaveする
			fmt.Println("process", i*1000)
		}()
	}
}

func main() {
	var wg sync.WaitGroup
	ch := make(chan int)

	// Producer 複数のgoroutineでなにか処理(データを取ってきたり)する
	for i := 0; i < 10; i++ {
		wg.Add(1)
		go producer(ch, i)
	}

	// Consumer Producerで取ってきたデータを順次処理する。
	go consumer(ch, &wg)
	wg.Wait()
	close(ch)
}

fan-out fan-in

並列処理を複数段階に分ける! 例えば役割ごとに処理を分けるなど。

package main

import "fmt"

func producer(first chan int) {
	defer close(first)
	for i := 0; i < 10; i++ {
		first <- i
	}
}

func multi2(first <-chan int, second chan<- int) {
	defer close(second)
	for i := range first {
		second <- i * 2
	}
}

func multi4(second <-chan int, third chan<- int) {
	defer close(third)
	for i := range second {
		third <- i * 4
	}
}

func main() {
	first := make(chan int)
	second := make(chan int)
	third := make(chan int)

	go producer(first)
	go multi2(first, second)
	go multi4(second, third)
	for result := range third{
		fmt.Println(result)
	}
}

channelとselect

違う並列処理を複数走らせて、お互いにブロッキングしないようにするときに使う。

package main

import (
	"fmt"
	"math/rand"
	"time"
)

func goroutine1(ch chan string) {
	for {
		// ex.ネットワーク越しにパケットを受信するイメージ
		ch <- "packet from 1"
		time.Sleep(3 * time.Second)
	}
}

func goroutine2(ch chan int) {
	for {
		ch <- rand.Intn(100)
		time.Sleep(1 * time.Second)
	}
}

func main() {
	c1 := make(chan string)
	c2 := make(chan int)
	go goroutine1(c1)
	go goroutine2(c2)

	// 違う並列処理を複数走らせて、お互いにブロッキングしないようにするときに使う
	for {
		select {
		case msg := <-c1:
			fmt.Println(msg)
		case num := <-c2:
			fmt.Println(num)
		}
	}
}

Default Selection: チャネルから受信してないときの処理
for selectから抜けるときは下記のOuterLoop(名前は何でも良い)のようにする。

package main

import (
	"fmt"
	"time"
)

func main() {
	tick := time.Tick(100 * time.Millisecond)
	boom := time.After(500 * time.Millisecond)
	OuterLoop:
		for {
			select {
			case <-tick:
				fmt.Println("tick.")
			case <-boom:
				fmt.Println("BOOM!")
				break OuterLoop // for selectから抜けるときに使う!名前は何でも良い。
			default: // 受信してないときはdefaultの処理が行われる
				fmt.Println("    .")
				time.Sleep(50 * time.Millisecond)
			}
		}
	fmt.Println("##############")
}

sync.Mutex

下のようなコードだと、時々並列処理の書き換えがコンフリクトしてエラーが出る。

package main

import (
	"fmt"
	"time"
)

func main() {
	c := make(map[string]int)
	go func() {
		for i := 0; i < 10; i++ {
			c["key"] += 1
		}
	}()
	go func() {
		for i := 0; i < 10; i++ {
			c["key"] += 1
		}
	}()
	time.Sleep(1 * time.Second)
	fmt.Println(c, c["key"])
}

そこでsync.Mutexを以下のように使う!

package main

import (
	"fmt"
	"sync"
	"time"
)

type Counter struct {
	v map[string]int
	mux sync.Mutex
}

func (c *Counter) Increment(key string) {
	c.mux.Lock()
	defer c.mux.Unlock()
	c.v[key]++
}

func (c *Counter) Value(key string) int {
	c.mux.Lock()
	defer c.mux.Unlock()
	return c.v[key]
}

func main() {
	//c := make(map[string]int)
	c := Counter{v: make(map[string]int)}
	go func() {
		for i := 0; i < 10; i++ {
			//c["key"] += 1
			c.Increment("Key")
		}
	}()
	go func() {
		for i := 0; i < 10; i++ {
			c.Increment("Key")
		}
	}()
	time.Sleep(1 * time.Second)
	//fmt.Println(c, c["key"])
	fmt.Println(c, c.Value("Key"))
}

参考

現役シリコンバレーエンジニアが教えるGo入門 + 応用でビットコインのシストレFintechアプリの開発

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?