LoginSignup
7

More than 1 year has passed since last update.

Golangでバッチとgoroutinesとgo channelsと比較してみましょ

Last updated at Posted at 2022-12-22

はじめに

初めまして。HRBrainでソフトウェアエンジニアをしているビクトルと申します。

この記事はHRBrain Advent Calendar 2022カレンダー1の23日目の記事です。

今年の4月にスペインから日本へ引っ越しまして、はじめてアドベントカレンダーをやっていきます!

私はバックエンドとDevOpsが好きなのでGolangのgoroutinesとgo channelsのパーフォマンスについてお話したいと思っています。

比較する方法にGrafanaとPrometheusを使いたいと思っています。

まず、簡単でchannelsとgoroutinesを紹介しています。

channelsとは

channelsはバッファーと同じ仕方をしています。channelを作成方法はこちらです。バッファーが空と満になってプロセスがブロックされています。

書き方はこれになっています:

bufferChannel := make(chan[channel direction] T, [size<T>])

goroutinesとは

goroutinesはスレッドと同じ仕方をしています。
開発者が望むようにプログラムを動かすには、スレッドと併存パターンを組み合わせなければなりません。
書き方はこれになっています:

go "func" FunctionName [ TypeParameters ] Signature [ FunctionBody ]

Hands On

golangでバッチを実装するとき何の技術を使いますか?たまに開発者さんこの質問を自分で聞いていると思いますのでここで試してみましょう。

まず、要件について。

  • docker-compose
  • docker
  • golang バージョン 1.17 から

では、私が使っている監視用のコンポーネントについてです。

  • Prometheus
  • Grafana

PrometheusとGrafanaについてはネットで検索するとたくさん出てくるので、1〜2行で紹介します。

  • Grafana:データソースから表現、変換、その他のアグリゲーションを行い、ユーザーに表示するためのツール
  • Prometheus:CPUやメモリ使用量、ヒープメモリやガベージコレクタなどのメトリクスデータを取得するためのツール

さあ、レッツHands On

今回のプログラムは、以下のようなことを行います。

  • 10個のcsvファイルを開く
  • その内容(1000行)を読み、いくつかの集計を行う
  • 結果を1つのファイルに書き出す

併存を使わないで、通常のバッチを見てみましょう。

cmd/normal_process/main.go

func (f *fileOperator) Read() (Users, error) {
	var users Users
	for _, fd := range f.files {
		r := csv.NewReader(fd)
		r.Comment = '#'
		r.Comma = ','

		d, err := r.ReadAll()
		if err != nil {
			return nil, fmt.Errorf("FileOperator: read file %s error: %w", fd.Name(), err)
		}

		if len(d) == 0 {
			return nil, nil
		}

		users = make(Users, len(d)-1, len(d))
		for i, l := range d {
			if i == 0 {
				continue
			}
			if len(l) == 0 {
				continue
			}

			id, _ := strconv.Atoi(l[0])
			td, _ := time.Parse("2006/01/02", l[6])
			users[i-1] = User{
				ID:        id,
				FirstName: l[1],
				LastName:  l[2],
				Email:     l[3],
				Gender:    l[4],
				Country:   l[5],
				Birthday:  td,
			}
		}
	}
	return users, nil
}

func (f *fileOperator) Write(users Users) error {
	w := csv.NewWriter(f.writeFile)
	d := make([][]string, 0)
	d = append(d, []string{"ID", "Name", "Email", "Country", "Gender", "Birthday"})
	for _, user := range users {
		d = append(d, user.ToArray())
	}
	if err := w.WriteAll(d); err != nil {
		return fmt.Errorf("FileOperator: write data in the file: %w", err)
	}
	return nil
}

実行しながらGrafanaを見ましょう。

Screenshot 2022-11-29 at 15.36.39.png

Screenshot 2022-11-29 at 15.36.50.png

Screenshot 2022-11-29 at 15.37.07.png

次はgoroutinesを

cmd/go_routines/main.goファイルで

func (f *fileOperator) ReadGoRoutine() Users {
	if len(f.files) == 0 {
		return nil
	}

	users := make(Users, 0)

	wg := sync.WaitGroup{}
	wg.Add(len(f.files))
	mtx := sync.Mutex{}

	for _, fd := range f.files {
		go func(fd *os.File, users *Users) {
			defer wg.Done()

			r := csv.NewReader(fd)
			r.Comment = '#'
			r.Comma = ','

			d, err := r.ReadAll()
			if err != nil {
				return
			}

			for i, l := range d {
				if i == 0 {
					continue
				}
				id, _ := strconv.Atoi(l[0])
				td, _ := time.Parse("2006/01/02", l[6])
				mtx.Lock()
				*users = append(*users, User{
					ID:        id,
					FirstName: l[1],
					LastName:  l[2],
					Email:     l[3],
					Gender:    l[4],
					Country:   l[5],
					Birthday:  td,
				})
				mtx.Unlock()
			}
		}(fd, &users)
	}

	wg.Wait()
	return users
}

func (f *fileOperator) WriteGoRoutine(users Users) error {
	w := csv.NewWriter(f.writeFile)

	mtx := sync.Mutex{}
	wg := sync.WaitGroup{}
	wg.Add(len(users))
	for _, user := range users {
		go func(user User) {
			defer wg.Done()
			mtx.Lock()
			defer mtx.Unlock()
			if err := w.Write(user.ToArray()); err != nil {
				return
			}
		}(user)
	}
	wg.Wait()
	return nil
}

Grafanaはこれになっています

Screenshot 2022-11-29 at 15.43.17.png

Screenshot 2022-11-29 at 15.43.32.png

Screenshot 2022-11-29 at 15.43.44.png

最後はchannelsを使いましょう

cmd/channels/main.goのファイル

func (f *fileOperatorChannel) Read(in <-chan UserStream) <-chan UserStream {
	out := make(chan UserStream, 5) // after 5 items the channel blocks

	go func() {
		for stream := range in {
			r := csv.NewReader(stream.file)
			r.Comment = '#'
			r.Comma = ','

			d, err := r.ReadAll()
			if err != nil {
				return
			}

			users := make(Users, 0)

			for i, l := range d {
				if i == 0 {
					continue
				}
				id, _ := strconv.Atoi(l[0])
				td, _ := time.Parse("2006/01/02", l[6])

				users = append(users, User{
					ID:        id,
					FirstName: l[1],
					LastName:  l[2],
					Email:     l[3],
					Gender:    l[4],
					Country:   l[5],
					Birthday:  td,
				})
			}

			out <- UserStream{
				file:  stream.file,
				users: users,
			}
		}
		close(out)
	}()

	return out
}

func (f *fileOperatorChannel) Write(done chan struct{}, in <-chan UserStream) {
	w := csv.NewWriter(f.writeFile)
	d := make([][]string, 0)
	
	go func() {
		defer close(done)
		for stream := range in {
			for _, user := range stream.users {
				d = append(d, user.ToArray())
			}
			if err := w.WriteAll(d); err != nil {
				log.Printf("error writing file with data %v: %v\n", d, err)
			}
		}
		done <- struct{}{}
	}()
}

Screenshot 2022-11-29 at 15.50.37.png

Screenshot 2022-11-29 at 15.50.47.png

Screenshot 2022-11-29 at 15.50.26.png

まとめて

グラフィックを見て、どちらを使うべきか?それは、何を実現したいのか、読み書きの順序を守りたいのか、処理を高速に実行したいのか、システムにCPUやメモリのリソースがあまりないのか、によって異なります。それは私たち次第です。
この記事では、各技術を比較し、多くの楽しみを得ることができます。

終わりに

HRBrainでは、コンピュータサイエンスに興味があり、DevOpsの方法論に従ってお客様のためにクールなプロダクトを作り上げる。プログラミングが好きな方、チャレンジを探しています方、ぜひご連絡ください → https://www.hrbrain.co.jp/recruit

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
7