52
8

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Systemi(株式会社システムアイ)Advent Calendar 2023

Day 21

Golang: 並行処理で何百万ものデータを簡単にフェッチする

Last updated at Posted at 2023-12-21

Golang: 並行処理で何百万ものデータを簡単にフェッチする

紹介

ソフトウェア・エンジニアとして、私たちはより少ない労力とメンテナンスで非常に強力なシステムを提供することを求められている。バックエンドの皆さんは、docやCSV、APIから何百万ものデータをフェッチするタスクがあるかもしれません。

巨大なデータをフェッチする際の問題は以下の通りです:

  1. パフォーマンス
  2. リソースの利用
  3. データのフィルタリングとソート

ケーススタディ

例えば、商品データを取得するためにパートナーのAPIと統合する必要があり、10分ごとに新鮮なデータを取得し、データが変更された場合はDBにアップサートする必要があるとします。

データのフェッチにいくつかの同時実行を使用することをお勧めします。データの異なるバッチで5つの同時実行があるとします。そうすると

  1. スレッド1は1-100000のデータを取得するために実行されます。
  2. スレッド2が100001-200000のデータをフェッチする。
  3. 別のスレッドで続行

並行処理とは

並行処理とは、ハードウェアとソフトウェアに応じて、コンピュータが複数のタスクを同時に実行する能力のことである。

実装

Golangでは、sync.WaitGroupを使って複数のプロセス(goroutine)を実行し、データの取得を処理することができます。

sync.WaitGroupはgoroutineの待機に使われる。実装はとても簡単で、Add()を使って実行したいゴルーチンの数を設定できる。

package main

import "sync"
import "runtime"
import "fmt"

var LIST_PRODUCT_TYPE = [3]string{"food", "electronics", "clothing"}

type GetListProductResponse struct {
    Data []ProductListResponse `json:"data"`
}

type ProductListResponse struct {
    Code         string `json:"code"`
    Name         string `json:"name"`
    Price        string `json:"price"`
    Status       bool   `json:"status"`
}

func getProducts(ctx context.Context, req *GetProductListRequest) (*GetListProductResponse, error) {
    // calling endpoint 3rd party
    // parse to response
    // and return the data
    return &productList, nil
}

func main() {
    wg := sync.WaitGroup{}
    doneChan := make(chan interface{}, 1) //シグナル待ち用
    productsChan := make(chan *GetListProductResponse) //実行するすべてのゴルーチンからのプロダクト・データを共有するためのチャンネルです。
    errChan := make(chan error)  //各ゴルーチンのデータ取得時のエラーを共有するためのチャンネル

//このループは商品データを同時に取得する
    for key := range LIST_PRODUCT_TYPE {
        wg.Add(1)
        req := &GetProductListRequest{
            ProductType: LIST_PRODUCT_TYPE[key],
        }
        go func() {
            defer wg.Done()
            products, err := getProductList(ctx, req)
            if err != nil {
                errChan <- err
                return
            }
            productsChan <- products
        }()
     }

     
//このコードは、sync.WaitGroup (wg)が完了するのを待ち、
//nil値をチャネル(doneChan)に送信してタスクが完了した
//ことを知らせる新しいgoroutineを開始している。

     go func() {
         wg.Wait()

         doneChan <- nil
     }()

     var (
         catalogues GetListProductResponse
         data       []ProductListResponse
     )

 // このforループは無限に実行され、その中にselect文があり、これは複数のチャンネルを同時に処理するために使われる。
     for {
         select {
         case <-ctx.Done():
             return nil, ctx.Err()
         case err := <-errChan:
             return nil, err
         case products := <-productsChan:
             data = append(data, products.Data...)
             catalogues.Data = data
         case <-doneChan:
         return &catalogues, nil
     }
   }
}

これがGolangでデータを同時に取得する例です。

52
8
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
52
8

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?