LoginSignup
1
0

並行処理を加えて修正してみた

Last updated at Posted at 2023-05-07

はじめに

以下URL記事ではGo言語で特定のカテゴリを含むURLをXMLファイルから抽出し、新しいXMLファイルに書き込む方法についてご紹介しました。今回の記事は学習がてらに並行処理を加えて以前のコードを修正してみました.....!!

実装

以下のコードが、以前のコードから並行処理を加えたものです。

package main

import (
	"bufio"
	"encoding/xml"
	"fmt"
	"os"
	"regexp"
	"sync"
)

/*
  以下の構造体は、XMLファイルのデータをGoの構造体に変換するために定義している。
  XMLファイルの構造をGoの構造体にマッピングすることで、Go言語でXMLデータを簡単に操作できる。
*/

// UrlSet構造体は、XMLファイルの<urlset>要素に対応
type UrlSet struct {
	XMLName xml.Name `xml:"urlset"`
	Urls    []Url    `xml:"url"`
}

// Url構造体は、XMLファイルの<url>要素に対応
type Url struct {
	XMLName xml.Name `xml:"url"`
	Loc     string   `xml:"loc"`
}

func main() {
	inputFile := "allPageUrl.xml"                                                     // 入力ファイル名
	categories := []string{"price", "company", "ranking", "office", "voice", "piano"} // 抽出するカテゴリ
	generateXMLFiles(inputFile, categories)                                           // カテゴリごとの.xmlファイルを生成する関数を呼び出す
}

// generateXMLFilesは、指定された入力ファイルとカテゴリのスライスを受け取り、各カテゴリの.xmlファイルを生成
func generateXMLFiles(inputFile string, categories []string) {
	categoryUrls := make(map[string][]Url)     // カテゴリごとにURLを格納するマップ
	categoryChans := make(map[string]chan Url) // カテゴリごとにチャネルを格納するマップ

	var mu sync.Mutex // Mutex

	xmlFile, err := os.Open(inputFile) // XMLファイルを開く
	if err != nil {
		fmt.Println("Error opening file:", err)
		return
	}
	defer xmlFile.Close() // 関数終了時にファイルを閉じる

	decoder := xml.NewDecoder(bufio.NewReader(xmlFile)) // XMLデコーダーを作成

	for _, category := range categories {
		categoryChans[category] = make(chan Url, 10) // カテゴリごとにチャネルを作成
	}

	// XMLデータをデコードし、各カテゴリチャネルに送信するgoroutine
	go func() {
		for {
			token, err := decoder.Token() // 次のXMLトークンを取得
			if err != nil {
				// エラーが発生したらすべてのチャネルを閉じてループを終了
				for _, ch := range categoryChans {
					close(ch)
				}
				break
			}

			switch elem := token.(type) {
			case xml.StartElement:
				if elem.Name.Local == "url" {
					var url Url
					err := decoder.DecodeElement(&url, &elem) // url要素をデコードしてUrl構造体に格納
					if err != nil {
						fmt.Println("Error decoding element:", err)
						return
					}
					for _, ch := range categoryChans {
						ch <- url // デコードしたurlを各カテゴリチャネルに送信
					}
				}
			}
		}
	}()

	var wg sync.WaitGroup   // WaitGroupの作成
	wg.Add(len(categories)) // 待機するgoroutineの数を設定

	categoryRegexps := make(map[string]*regexp.Regexp) // カテゴリごとの正規表現を格納するマップ
	for _, category := range categories {
		categoryRegexps[category] = regexp.MustCompile("/" + category + "/")
	}

	// カテゴリごとにurlを処理するgoroutine
	for _, category := range categories {
		go func(category string) {
			defer wg.Done() // 処理が完了したらWaitGroupのカウンタをデクリメント

			for url := range categoryChans[category] { // カテゴリチャネルからURLを受信
				if categoryRegexps[category].MatchString(url.Loc) { // URLがカテゴリの正規表現にマッチするかチェック
					mu.Lock()                                                    // Mutexでロック
					categoryUrls[category] = append(categoryUrls[category], url) // マッチしたURLをカテゴリURLに追加
					mu.Unlock()                                                  // Mutexのロックを解除
				}
			}
		}(category)
	}

	wg.Wait() // すべてのgoroutineが完了するまで待機

	// カテゴリごとにXMLファイルを作成
	for _, category := range categories {
		outputFile := category + ".xml"
		newUrlSet := UrlSet{Urls: categoryUrls[category]}
		newXmlData, err := xml.MarshalIndent(newUrlSet, "", "  ") // カテゴリごとのURLセットを整形してXMLにマーシャル
		if err != nil {
			fmt.Println("Error marshalling XML:", err)
			return
		}

		newXmlFile, err := os.Create(outputFile) // 新しいXMLファイルを作成
		if err != nil {
			fmt.Println("Error creating file:", err)
			return
		}
		defer newXmlFile.Close() // 関数終了時にファイルを閉じる

		newXmlFile.WriteString(xml.Header) // XMLヘッダを書き込む
		newXmlFile.Write(newXmlData)       // XMLデータを書き込む
	}
}

そもそも並行処理とは & 非同期処理との違いは

  • 並行処理は、複数のタスクが同時期に実行されることを意味します。これは、各タスクが独立して進行し、他のタスクとは交差しないことを示します。並行処理では、実際に同時に実行されているかどうかは問題ではなく、それらのタスクが互いに干渉しないように独立していることが重要です。Go言語では、goroutineを使用して並行処理を実装できます。

  • 非同期処理は、あるタスクが完了するのを待たずに、他のタスクが実行されることを意味します。つまり、1つのタスクが終わるまで次のタスクに進まない同期処理とは対象的です。非同期処理では、タスクの完了に時間がかかる場合でも、その間に他のタスクが進行できるため、全体の効率が向上することがあります。

  • 並行処理と非同期処理の違いは、並行処理が複数のタスクが独立して進行することを重視しているのに対し、非同期処理はタスクが完了するのを待たずに他のタスクを実行することを重視している点です。Go言語のgoroutineは、並行処理を簡単に実装できる仕組みであり、それによって非同期処理も実現できます。

  • つまり、Go言語では、並行処理の仕組み(goroutine)を利用して、非同期処理を実現しています。(Go言語のgoroutineを使用することで、複数のタスクを独立して進行させることができ、それらのタスクがお互いに待たずに実行されるため、非同期処理が実現されるから)

sync.WaitGroupとは

  • sync.WaitGroup は、Go言語の標準ライブラリである sync パッケージに含まれる構造体で、複数のgoroutineが完了するのを待つために使用されます。sync.WaitGroup を使用することで、複数の並行処理がすべて終了するまでプログラムの実行をブロックすることができます。これは、特に並行処理の結果がプログラムの他の部分で必要な場合や、すべての並行処理が終了する前にプログラムを終了させたくない場合に便利です。

  • 下記コードはWaitGroupを作成して、待機するgoroutineの数を設定しています。defer wg.Done()で処理が完了したらWaitGroupのカウンタをデクリメントするようにしています。ここでいう処理が完了するとはカテゴリチャネルからURLを受信しなくなった時です。WaitGroupのカウンタ数が0になったらwg.Wait()が解除されて次のテゴリごとにXMLファイルを作成する処理に進みます。

// ... 他の処理

var wg sync.WaitGroup   // WaitGroupの作成
wg.Add(len(categories)) // 待機するgoroutineの数を設定

// ... 他の処理

// カテゴリごとにurlを処理するgoroutine
	for _, category := range categories {
		go func(category string) {
			defer wg.Done() // 処理が完了したらWaitGroupのカウンタをデクリメント

			for url := range categoryChans[category] { // カテゴリチャネルからURLを受信
				if categoryRegexps[category].MatchString(url.Loc) { // URLがカテゴリの正規表現にマッチするかチェック
					mu.Lock()                                                    // Mutexでロック
					categoryUrls[category] = append(categoryUrls[category], url) // マッチしたURLをカテゴリURLに追加
					mu.Unlock()                                                  // Mutexのロックを解除
				}
			}
		}(category)
	}

wg.Wait() // すべてのgoroutineが完了するまで待機

// すべてのgoroutineが完了しないとこの次のカテゴリごとにXMLファイルを作成する処理に進めれないようにする

sync.Mutexとは

  • sync.Mutexは、Go言語で提供される排他制御(mutual exclusion)のプリミティブです。複数のゴルーチンが同時に共有リソースにアクセスすることを防ぐために使用されます。sync.Mutexを使うことで、一度に1つのゴルーチンだけが共有リソースにアクセスでき、他のゴルーチンは待機します。
    制御が必要な理由は、複数のゴルーチンが同時に共有リソースにアクセスすると、データの破損や競合状態(race condition)が発生する可能性があるためです。
    競合状態は、複数のゴルーチンが同じリソースに同時に読み書きを行い、結果が実行順序によって変わる状況を指します。これにより、プログラムの挙動が不安定になり、デバッグが困難になることがあります。
    排他制御(ミューテックスなど)を使用することで、共有リソースに対するアクセスを一度に1つのゴルーチンだけに制限し、データの整合性を保つことができます。これにより、競合状態やデータの破損を防ぐことができます。

  • sync.Mutexは、Lock()とUnlock()という2つの主要なメソッドを提供します。

  • Lock(): ミューテックスをロックします。他のゴルーチンが既にロックしている場合、そのゴルーチンがミューテックスを解除するまで待機します。

  • Unlock(): ミューテックスを解除します。待機中のゴルーチンのうち1つがミューテックスを取得できるようになります。

  • ミューテックスを使用する際は、共有リソースへのアクセス前にLock()を呼び出し、アクセス後にUnlock()を呼び出すことで、リソースへの同時アクセスを防ぐことができます。

// カテゴリごとにurlを処理するgoroutine
	for _, category := range categories {
		go func(category string) {
			defer wg.Done() // 処理が完了したらWaitGroupのカウンタをデクリメント

			for url := range categoryChans[category] { // カテゴリチャネルからURLを受信
				if categoryRegexps[category].MatchString(url.Loc) { // URLがカテゴリの正規表現にマッチするかチェック
					mu.Lock()                                                    // Mutexでロック
					categoryUrls[category] = append(categoryUrls[category], url) // マッチしたURLをカテゴリURLに追加
					mu.Unlock()                                                  // Mutexのロックを解除
				}
			}
		}(category)
	}

上記コードの以下の箇所について説明します

mu.Lock() // Mutexでロック
categoryUrls[category] = append(categoryUrls[category], url) // マッチしたURLをカテゴリURLに追加
mu.Unlock() // Mutexのロックを解除
  • mu.Lock() と mu.Unlock() で囲まれた部分はクリティカルセクションです。この部分は、一度に1つのgoroutineだけがアクセスできるようになっています。

  • mutexによるロックが行われると、他のgoroutineはクリティカルセクションにアクセスできなくなりますが、並行処理自体は完全に中断されるわけではありません。ロックがかかっている間、他のgoroutineはクリティカルセクションに入るのを待つ状態になりますが、その他の部分(例えば、カテゴリチャネルからURLを受信する部分)は引き続き並行して実行されます。

  • したがって、マッチしたgoroutineがクリティカルセクション内で処理を行っている間、他のgoroutineはその処理を待っていますが、完全に中断されているわけではありません。これにより、データの競合や一貫性の問題を防ぎながら、効率的な並行処理が実現されます。

まとめ

この記事では、Go言語で与えられたXMLファイルから特定のカテゴリに属するURLを抽出し、カテゴリごとに新しいXMLファイルを作成する方法を説明しました。sync.Mutex と sync.WaitGroup を使用して、複数のgoroutine間でデータと同期を管理する方法も示しました。この方法を応用することで、さまざまなデータ処理タスクを効率的に並行して実行することができます。(誤りがありましたら指摘していただきたいです....!)

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0