11
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Goのgoroutine, channelを使って並列処理

Last updated at Posted at 2022-06-05

goのgoroutine,channelを使って並列処理をしてみました。
今回は郵便局のサイトから各都道府県の郵便番号データを全都道府県分ダウンロードした上で、
これを10件ずつ並列処理で解凍してDBに登録するところまでを試してみました。

郵便番号データは以下からダウンロード

https://www.post.japanpost.jp/zipcode/dl/oogaki-zip.html

前提環境

go1.8

# 郵便番号データのcsvはSJISの為、以下のパッケージをダウンロード
go get -u golang.org/x/text/encoding/japanese golang.org/x/text/transform

# DBはmysql ORMはgormを利用
go get -u github.com/go-sql-driver/mysql gorm.io/gorm

# DB定義情報はiniファイル管理にしたかったのでini.viパッケージをダウンロード
go get -u gopkg.in/ini.v1

パッケージの構成

今回はgoroutineのchannelの動きを試したかったので、構成は適当です。

<projectroot>
┝ main.go
┝ config.ini # DB接続情報定義
└ storage
  ┝ zip # 住所zipファイル群
  │ ┝ 01hokkai.zip # 住所zipファイル群
  │ ┝ 02aomori.zip
  │ ┝ 03iwate.zip
  │ ┝ 04miyagi.zip
  │ ┝ 05akita.zip
  │ │
  │  ...
  └ unzip # 住所zip解凍ファイル格納

処理

同時並列数をバッファサイズにしたチャネルを作り、
そのチャネルを待たせる事で並列処理を実現する。

slotsチャネルは指定した数まで受信したらバッファサイズ制限に到達する為、
<-slotsが呼ばれて解放されない限りは後続の処理は起動しない。

今回は並列処理数を10とした。
並列処理されているのを分かりやすくする為、5秒のSleepをいれた。

package main

import (
	"archive/zip"
	"encoding/csv"
	"fmt"
	"gorm.io/gorm"
	"golang.org/x/text/encoding/japanese"
	"golang.org/x/text/transform"
	"gopkg.in/ini.v1"
	"gorm.io/gorm/logger"
	"io"
	"log"
	"os"
	"sync"
	"time"
	"gorm.io/driver/mysql"
)

// RDS m_addressテーブル
type mAddress struct {
	ZipCode    string    `gorm:"column:zip_code;not null"`
	Prefecture string    `gorm:"column:prefecture;not null"`
	City       string    `gorm:"column:city;not null"`
	Area       string    `gorm:"column:area"`
	CreateUser string    `gorm:"column:create_user;not null"`
	Created    time.Time `gorm:"column:created;not null"`
	UpdateUser string    `gorm:"column:update_user;not null"`
	Updated    time.Time `gorm:"column:updated;not null"`
}

// ZIPファイル格納dir
const ZIP_FILE_PATH string = "storage/zip"

// 解凍ファイル格納dir
const UNZIP_FILE_PATH string = "storage/unzip"

var db *gorm.DB

func init() {
	// DB接続
	cfg, err := ini.Load("config.ini")
	if err != nil {
		panic(err.Error())
	}
	dbConn := fmt.Sprintf(
		"%s:%s@tcp(%s:%s)/%s?charset=utf8&parseTime=True&loc=Local",
		cfg.Section("db").Key("db_user_name").String(),
		cfg.Section("db").Key("db_user_password").String(),
		cfg.Section("db").Key("db_host").String(),
		cfg.Section("db").Key("db_port").String(),
		cfg.Section("db").Key("db_name").String(),
	)
	db, err = gorm.Open(mysql.Open(dbConn),  &gorm.Config{})
	if err != nil {
		log.Fatalln(err)
	}
	db.Logger = db.Logger.LogMode(logger.Silent)
}

func main() {
	log.Println("start")

	db.Exec("TRUNCATE TABLE m_address")

	// ① チャンネルを定義
	slots := make(chan struct{}, 10)

	// ② WaitGroupを定義
	var wg sync.WaitGroup

	t := db.Begin()

	// ③ zipファイル分ループ
	for _, v := range getFileNames() {
		// ④ WaitGroupをインクリメント
		wg.Add(1)
		slots <- struct{}{}

		go func(a map[string]string) {
			defer wg.Done() // WaitGroupをデクリメント
			defer func(s chan struct{}){
				<-s // 処理が終わったらchannelを解放
			}(slots)

			// ④ zipファイル解凍 & insert
			if err := exec(fmt.Sprintf(ZIP_FILE_PATH + "/%s", a["file_name"])); err != nil {
				panic(err.Error())
			}

			// 処理を分かりやすくする為、Sleep
			time.Sleep(time.Second * 5)
		}(v)
	}

	wg.Wait()

	// commit
	t.Commit()

	// db close
	conn, err := db.DB()
	if err != nil {
		log.Fatalln(err)
	}
	conn.Close()

	log.Println("finished")
}

// ④ zipファイル解凍 & insert
func exec(src string) error {
	// ファイル解凍
	r, err := zip.OpenReader(src)
	if err != nil {
		return err
	}
	defer func(r *zip.ReadCloser) {
		_ = r.Close()
	}(r)

	for _, zipReadFile := range r.File {
		log.Printf(zipReadFile.Name)

		csvFilePath := fmt.Sprintf(UNZIP_FILE_PATH + "/%s", zipReadFile.Name)
		reader, err := zipReadFile.Open()
		if err != nil {
			return err
		}
		defer func(r io.ReadCloser) {
			_ = r.Close()
		}(reader)

		// File生成
		writeFile, err := os.Create(csvFilePath)
		if err != nil {
			return err
		}
		defer func(f *os.File) {
			_ = f.Close()
		}(writeFile)

		// コピー
		if _, err := io.Copy(writeFile, reader); err != nil {
			return err
		}

		csvFile, err := os.Open(csvFilePath)
		if err != nil {
			return err
		}
		defer func(f *os.File) {
			_ = f.Close
		}(csvFile)

		csvFileReader := csv.NewReader(transform.NewReader(csvFile, japanese.ShiftJIS.NewDecoder()))
		var line []string
		var mAddressList []mAddress
		for {
			line, err = csvFileReader.Read()
			if err != nil {
				break
			}
			address := mAddress{}

			// csvレコードを構造体に追加
			toStruct(line, &address)

			// 1レコードずつ追加
			mAddressList = append(mAddressList, address)
		}
		// insert!!!
		db.Table(`m_address`).Create(&mAddressList)
	}

	return nil
}

func toStruct(recode []string, a *mAddress) {
	a.ZipCode = recode[2]
	a.Prefecture = recode[6]
	a.City = recode[7]
	a.Area = recode[8]
	a.CreateUser = "system"
	a.Created = time.Now()
	a.UpdateUser = "system"
	a.Updated = time.Now()
}

// zipファイル情報を返却
func getFileNames() []map[string]string {
	return []map[string]string{
		{"prefecture_name": "北海道", "file_name": "01hokkai.zip"},
		{"prefecture_name": "青森県", "file_name": "02aomori.zip"},
		{"prefecture_name": "岩手県", "file_name": "03iwate.zip"},
		{"prefecture_name": "宮城県", "file_name": "04miyagi.zip"},
		{"prefecture_name": "秋田県", "file_name": "05akita.zip"},
		{"prefecture_name": "山形県", "file_name": "06yamaga.zip"},
		{"prefecture_name": "福島県", "file_name": "07fukush.zip"},
		{"prefecture_name": "茨城県", "file_name": "08ibarak.zip"},
		{"prefecture_name": "栃木県", "file_name": "09tochig.zip"},
		{"prefecture_name": "群馬県", "file_name": "10gumma.zip"},
		{"prefecture_name": "埼玉県", "file_name": "11saitam.zip"},
		{"prefecture_name": "千葉県", "file_name": "12chiba.zip"},
		{"prefecture_name": "東京都", "file_name": "13tokyo.zip"},
		{"prefecture_name": "神奈川県", "file_name": "14kanaga.zip"},
		{"prefecture_name": "新潟県", "file_name": "15niigat.zip"},
		{"prefecture_name": "富山県", "file_name": "16toyama.zip"},
		{"prefecture_name": "石川県", "file_name": "17ishika.zip"},
		{"prefecture_name": "福井県", "file_name": "18fukui.zip"},
		{"prefecture_name": "山梨県", "file_name": "19yamana.zip"},
		{"prefecture_name": "長野県", "file_name": "20nagano.zip"},
		{"prefecture_name": "岐阜県", "file_name": "21gifu.zip"},
		{"prefecture_name": "静岡県", "file_name": "22shizuo.zip"},
		{"prefecture_name": "愛知県", "file_name": "23aichi.zip"},
		{"prefecture_name": "三重県", "file_name": "24mie.zip"},
		{"prefecture_name": "滋賀県", "file_name": "25shiga.zip"},
		{"prefecture_name": "京都府", "file_name": "26kyouto.zip"},
		{"prefecture_name": "大阪府", "file_name": "27osaka.zip"},
		{"prefecture_name": "兵庫県", "file_name": "28hyogo.zip"},
		{"prefecture_name": "奈良県", "file_name": "29nara.zip"},
		{"prefecture_name": "和歌山県", "file_name": "30wakaya.zip"},
		{"prefecture_name": "鳥取県", "file_name": "31tottor.zip"},
		{"prefecture_name": "島根県", "file_name": "32shiman.zip"},
		{"prefecture_name": "岡山県", "file_name": "33okayam.zip"},
		{"prefecture_name": "広島県", "file_name": "34hirosh.zip"},
		{"prefecture_name": "山口県", "file_name": "35yamagu.zip"},
		{"prefecture_name": "徳島県", "file_name": "36tokush.zip"},
		{"prefecture_name": "香川県", "file_name": "37kagawa.zip"},
		{"prefecture_name": "愛媛県", "file_name": "38ehime.zip"},
		{"prefecture_name": "高知県", "file_name": "39kochi.zip"},
		{"prefecture_name": "福岡県", "file_name": "40fukuok.zip"},
		{"prefecture_name": "佐賀県", "file_name": "41saga.zip"},
		{"prefecture_name": "長崎県", "file_name": "42nagasa.zip"},
		{"prefecture_name": "熊本県", "file_name": "43kumamo.zip"},
		{"prefecture_name": "大分県", "file_name": "44oita.zip"},
		{"prefecture_name": "宮崎県", "file_name": "45miyaza.zip"},
		{"prefecture_name": "鹿児島県", "file_name": "46kagosh.zip"},
		{"prefecture_name": "沖縄県", "file_name": "47okinaw.zip"},
	}
}

出力

ログを見ると5秒おきに10件ずつ実行されている。

2022/06/05 04:14:31 start
2022/06/05 04:14:31 07FUKUSH.CSV
2022/06/05 04:14:31 01HOKKAI.CSV
2022/06/05 04:14:31 02AOMORI.CSV
2022/06/05 04:14:31 06YAMAGA.CSV
2022/06/05 04:14:31 03IWATE.CSV
2022/06/05 04:14:31 10GUMMA.CSV
2022/06/05 04:14:31 04MIYAGI.CSV
2022/06/05 04:14:31 08IBARAK.CSV
2022/06/05 04:14:31 05AKITA.CSV
2022/06/05 04:14:31 09TOCHIG.CSV

2022/06/05 04:14:36 11SAITAM.CSV
2022/06/05 04:14:36 12CHIBA.CSV
2022/06/05 04:14:36 13TOKYO.CSV
2022/06/05 04:14:36 14KANAGA.CSV
2022/06/05 04:14:36 15NIIGAT.CSV
2022/06/05 04:14:36 16TOYAMA.CSV
2022/06/05 04:14:36 17ISHIKA.CSV
2022/06/05 04:14:36 18FUKUI.CSV
2022/06/05 04:14:36 19YAMANA.CSV
2022/06/05 04:14:36 20NAGANO.CSV

2022/06/05 04:14:41 21GIFU.CSV
2022/06/05 04:14:42 22SHIZUO.CSV
2022/06/05 04:14:42 23AICHI.CSV
2022/06/05 04:14:42 24MIE.CSV
2022/06/05 04:14:42 25SHIGA.CSV
2022/06/05 04:14:42 26KYOUTO.CSV
2022/06/05 04:14:42 27OSAKA.CSV
2022/06/05 04:14:42 28HYOGO.CSV
2022/06/05 04:14:42 29NARA.CSV
2022/06/05 04:14:42 30WAKAYA.CSV

2022/06/05 04:14:46 31TOTTOR.CSV
2022/06/05 04:14:47 32SHIMAN.CSV
2022/06/05 04:14:47 33OKAYAM.CSV
2022/06/05 04:14:47 34HIROSH.CSV
2022/06/05 04:14:47 35YAMAGU.CSV
2022/06/05 04:14:47 36TOKUSH.CSV
2022/06/05 04:14:47 37KAGAWA.CSV
2022/06/05 04:14:47 38EHIME.CSV
2022/06/05 04:14:48 39KOCHI.CSV
2022/06/05 04:14:48 40FUKUOK.CSV

2022/06/05 04:14:51 41SAGA.CSV
2022/06/05 04:14:52 42NAGASA.CSV
2022/06/05 04:14:52 43KUMAMO.CSV
2022/06/05 04:14:52 44OITA.CSV
2022/06/05 04:14:52 45MIYAZA.CSV
2022/06/05 04:14:52 46KAGOSH.CSV
2022/06/05 04:14:52 47OKINAW.CSV
2022/06/05 04:14:58 finished

DBにもちゃんと登録されてそう。

+----------+------------+--------------+--------------------------------------+-------------+---------------------+-------------+---------------------+
| zip_code | prefecture | city         | area                                 | create_user | created             | update_user | updated             |
+----------+------------+--------------+--------------------------------------+-------------+---------------------+-------------+---------------------+
| 1000000  | 東京都     | 千代田区     | 以下に掲載がない場合                 | system      | 2022-06-05 04:16:34 | system      | 2022-06-05 04:16:34 |
| 1000001  | 東京都     | 千代田区     | 千代田                               | system      | 2022-06-05 04:16:34 | system      | 2022-06-05 04:16:34 |
| 1000002  | 東京都     | 千代田区     | 皇居外苑                             | system      | 2022-06-05 04:16:34 | system      | 2022-06-05 04:16:34 |
| 1000003  | 東京都     | 千代田区     | 一ツ橋(1丁目)                     | system      | 2022-06-05 04:16:34 | system      | 2022-06-05 04:16:34 |
| 1000004  | 東京都     | 千代田区     | 大手町(次のビルを除く)             | system      | 2022-06-05 04:16:34 | system      | 2022-06-05 04:16:34 |
| 1000005  | 東京都     | 千代田区     | 丸の内(次のビルを除く)             | system      | 2022-06-05 04:16:34 | system      | 2022-06-05 04:16:34 |
| 1000006  | 東京都     | 千代田区     | 有楽町                               | system      | 2022-06-05 04:16:34 | system      | 2022-06-05 04:16:34 |
...

注意点

チャネルの解放しないと後続の処理が始まらないので注意。
また、WiteGroupをデクリメントしないと全体の処理が終わらないので、こちらも注意が必要。

defer wg.Done()
defer func(s chan struct{}){
	<-s // 処理が終わったらchannelを解放
}(slots)

最後に

簡単に並列処理、同時実行数の制御が出来るので、機会があれば是非、channel使ってみてください!!

※訂正や補足等あればコメント等いただけますとありがたいです。
最後まで読んでいただき、ありがとうございました。

11
4
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
11
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?