LoginSignup
13
15

More than 5 years have passed since last update.

Python / Golangによる並行処理プログラミングのお勉強

Last updated at Posted at 2018-12-31

最近、Pythonでシステムプログラミングすることが多くなり、Golangに接する機会が全くなくなってしまったので、自分自身のGolangプログラミング知識を養う意味で、並列処理プログラミングの基本に触れてみた。

Golangの並列処理プログラミングのお勉強の素材としては、
以前のQiita記事 "PythonでのノンブロックングI/OなEventlet動作を試してみた" を参考にして、サンプルアプリを移植することにしました。

◼️ プラクティス「並行処理を複数起動してみる」

⬜︎ 1stステップ (まずは、基本動作の確認)

(1) Pythonでの実装事例

PythonでのノンブロックングI/Oなプログラミングとして、Eventlet活用によるユーザ側スレッド(グリーンスレッド)を単体で操作させるだけのサンプルプログラムの再掲です。

sample1.py
import eventlet
import datetime
import time
eventlet.monkey_patch()

def _sample_processing():
    print("### Sample processing has started")
    for _ in range(5):
        print(datetime.datetime.today())
        time.sleep(1)
    print("### Sample processing has finished")


def start_sample():
    opthread = eventlet.spawn(_sample_processing)
    opthread.wait()


if __name__ == '__main__':
    start_sample()

動かしてみた

$ python sample1.py 
### Sample processing has started
2019-01-05 08:30:50.538612
2019-01-05 08:30:51.543222
2019-01-05 08:30:52.548203
2019-01-05 08:30:53.553529
2019-01-05 08:30:54.555284
### Sample processing has finished

(2) Golangでの実装事例

goroutineを活用したサンプルプログラムです。

sample1.go
package main

import (
    "fmt"
    "sync"
    "time"
)

var wg sync.WaitGroup

func _sampleProcessing() {
    defer wg.Done()
    fmt.Println("### Sample processing has started")
    for count := 0; count < 5; count++ {
        fmt.Printf("%s\n", time.Now())
        time.Sleep(1 * time.Second)
    }
    fmt.Println("### Sample processing has finished")
}

func startSample() {
    wg.Add(1)
    go _sampleProcessing()
    wg.Wait()
}

func main() {
    startSample()
}

動かしてみた

$ go run sample1.go 
### Sample processing has started
2019-01-05 08:37:33.082434 +0900 JST m=+0.000447223
2019-01-05 08:37:34.087662 +0900 JST m=+1.005713428
2019-01-05 08:37:35.090421 +0900 JST m=+2.008500414
2019-01-05 08:37:36.090829 +0900 JST m=+3.008943606
2019-01-05 08:37:37.095416 +0900 JST m=+4.013559257
### Sample processing has finished

はい、同じような結果になりました。

⬜︎ 2ndステップ (続いて、並行動作の確認)

(1) Pythonでの実装事例

つづいて、グリーンスレッドを複数動作させて、動作を連携させたい場合を想定して、少し、拡張。。。

sample2.py
import eventlet
import datetime
import time
eventlet.monkey_patch()

def _sample_processing():
    print("### Sample processing has started")
    for _ in range(5):
        print(datetime.datetime.today())
        time.sleep(1)
    print("### Sample processing has finished")


def start_sample():
    opthread = eventlet.spawn(_sample_processing)
    finish_event = eventlet.event.Event()

    def thread_finished(thread, event):
        print("### Sample processing notification")
        event.send()
    opthread.link(thread_finished, finish_event)

    time.sleep(0)
    _sample_processing_monitor(finish_event)


def _sample_processing_monitor(finish_event):
    while True:
        if not finish_event.ready():
            print("+++ Sample processing is still running!!")
        else:
            print("+++ Sample processing is done!!")
            break
        time.sleep(1)


if __name__ == '__main__':
    start_sample()

動かしてみた

$ python sample2.py 
### Sample processing has started
2019-01-05 08:40:33.042171
+++ Sample processing is still running!!
2019-01-05 08:40:34.043925
+++ Sample processing is still running!!
2019-01-05 08:40:35.044192
+++ Sample processing is still running!!
2019-01-05 08:40:36.049492
+++ Sample processing is still running!!
2019-01-05 08:40:37.051525
+++ Sample processing is still running!!
### Sample processing has finished
### Sample processing notification
+++ Sample processing is done!!

いい感じで、_sample_processing側のグリーンスレッドの動作完了が観測できるようになった。

(2) Golangでの実装事例

goroutine + channelを活用して、Eventlet動作と同等の並行処理を実現してみました。

sample2.go
package main

import (
    "fmt"
    "sync"
    "time"
)

var wg sync.WaitGroup

func _sampleProcessing(quit chan bool) {
    fmt.Println("### Sample processing has started")
    for count := 0; count < 5; count++ {
        fmt.Printf("%s\n", time.Now())
        time.Sleep(1 * time.Second)
    }
    fmt.Println("### Sample processing has finished")
    quit <- true
}

func startSample() {
    quit := make(chan bool)
    done := make(chan interface{})
    wg.Add(1)
    go _sampleProcessing(quit)
    go _sampleProcessingMonitor(done)
    <-quit
    fmt.Println("### Sample processing notification")
    close(done)
    wg.Wait()
}

func _sampleProcessingMonitor(done <-chan interface{}) {
    defer wg.Done()
    for {
        select {
        case <-done:
            fmt.Println("+++ Sample processing is done!!")
            return
        default:
            fmt.Println("+++ Sample processing is still running!!")
        }
        time.Sleep(1 * time.Second)
    }
}

func main() {
    startSample()
}

動かしてみた

$ go run sample2.go 
+++ Sample processing is still running!!
### Sample processing has started
2019-01-05 14:31:24.171169 +0900 JST m=+0.000435979
+++ Sample processing is still running!!
2019-01-05 14:31:25.176254 +0900 JST m=+1.005549760
+++ Sample processing is still running!!
2019-01-05 14:31:26.179963 +0900 JST m=+2.009292313
+++ Sample processing is still running!!
2019-01-05 14:31:27.180125 +0900 JST m=+3.009481765
+++ Sample processing is still running!!
2019-01-05 14:31:28.181371 +0900 JST m=+4.010759225
+++ Sample processing is still running!!
### Sample processing has finished
### Sample processing notification
+++ Sample processing is done!!

期待通りに、Pythonの実装事例の場合と、同じような挙動になりました

⬜︎ 3rdステップ (並行処理プログラム間でのデータ受け渡し)

(1) Pythonでの実装事例

ここから、実践的なコードにチャレンジしてみます。
OpenStackなどのPythonプログラミングでは、queue経由でのグリーンスレッド間でのデータ受け渡しを行うことが多いです。
そこで、現在時刻情報をqueue経由で受け渡しできるように、先ほどのサンプルコード"sample2.py"を、さらに改造してみます。

sample3.py
import eventlet
import datetime
import time
eventlet.monkey_patch()

class Measurement_Time_of_day(object):
    def __init__(self):
        self.queue = eventlet.queue.Queue()

    def get_time_of_day(self):
        data = datetime.datetime.today()
        self.queue.put(data)

def _sample_processing(instance):
    print("### Sample processing has started")
    for _ in range(5):
        instance.get_time_of_day()
        time.sleep(1)
    print("### Sample processing has finished")


def start_sample():
    instance = Measurement_Time_of_day()
    opthread = eventlet.spawn(_sample_processing, instance)
    finish_event = eventlet.event.Event()

    def thread_finished(thread, event):
        print("### Sample processing notification")
        event.send()
    opthread.link(thread_finished, finish_event)

    time.sleep(0)
    _sample_processing_monitor(finish_event, instance)


def _sample_processing_monitor(finish_event, instance):
    while True:
        if not finish_event.ready():
            while not instance.queue.empty():
                data = instance.queue.get()
                print("+++ Sample processing is running!! [{}]".format(data))
                if instance.queue.empty():
                    break
            time.sleep(0)
        else:
            print("+++ Sample processing is done!!")
            break


if __name__ == '__main__':
    start_sample()

動かしてみた

$ python sample3.py 
### Sample processing has started
+++ Sample processing is running!! [2019-01-05 08:53:29.326131]
+++ Sample processing is running!! [2019-01-05 08:53:30.326178]
+++ Sample processing is running!! [2019-01-05 08:53:31.326213]
+++ Sample processing is running!! [2019-01-05 08:53:32.326248]
+++ Sample processing is running!! [2019-01-05 08:53:33.326282]
### Sample processing has finished
### Sample processing notification
+++ Sample processing is done!!

いい感じに、動作できていますね。

(2) Golangでの実装事例

goroutine + channelを活用して、Eventlet動作と同等の並行処理を実現してみました。
ここでのポイントは、現在時刻情報をchannel経由で受け渡しできるように、先ほどのサンプルコード"sample2.go"を、さらに改造しているのですが、プログラム構造は、ほとんど変更することなく、実現できている点ですね。
Golangでの並行処理プログラミングの強みが理解できた気がします。

sample3.go
package main

import (
    "fmt"
    "sync"
    "time"
)

var wg sync.WaitGroup

func _sampleProcessing(queue chan time.Time, quit chan bool) {
    fmt.Println("### Sample processing has started")
    for count := 0; count < 5; count++ {
        queue <- time.Now()
        time.Sleep(1 * time.Second)
    }
    fmt.Println("### Sample processing has finished")
    quit <- true
}

func startSample() {
    quit := make(chan bool)
    queue := make(chan time.Time)
    done := make(chan interface{})
    wg.Add(1)
    go _sampleProcessing(queue, quit)
    go _sampleProcessingMonitor(done, queue)
    <-quit
    fmt.Println("### Sample processing notification")
    close(done)
    wg.Wait()
}

func _sampleProcessingMonitor(done <-chan interface{}, queue chan time.Time) {
    defer wg.Done()
    for {
        select {
        case data := <-queue:
            fmt.Printf("+++ Sample processing is running!! [%s]\n", data)
        case <-done:
            fmt.Println("+++ Sample processing is done!!")
            return
        }
    }
}

func main() {
    startSample()
}

動かしてみた

$ go run sample3.go 
### Sample processing has started
+++ Sample processing is running!! [2019-01-05 14:32:42.082169 +0900 JST m=+0.000783936]
+++ Sample processing is running!! [2019-01-05 14:32:43.083297 +0900 JST m=+1.001940778]
+++ Sample processing is running!! [2019-01-05 14:32:44.084343 +0900 JST m=+2.003020791]
+++ Sample processing is running!! [2019-01-05 14:32:45.085458 +0900 JST m=+3.004164318]
+++ Sample processing is running!! [2019-01-05 14:32:46.086591 +0900 JST m=+4.005328912]
### Sample processing has finished
### Sample processing notification
+++ Sample processing is done!!

こちらも、期待通りに、Pythonの実装事例の場合と、同じような挙動になりました。

⬜︎ 4thステップ (並行処理プログラムの多重度を増やす)

O'Reilly本の"Go言語による並行処理"で、もう少し、goroutine / channelの知見が溜まったので、さらに、並行処理プログラムの多重度を増やしてみました。

(1) Pythonでの実装事例

ちょっと、これ以上は、実装が煩雑になりそうなので、スキップします。

(2) Golangでの実装事例

並列処理は、複雑になっているはずなのに、可読性は、あまり低下していないのが、素晴らしいです。

sample4.go
package main

import (
    "fmt"
    "sync"
    "time"
)

var wg sync.WaitGroup

func _sampleProcessing1(queue1 chan time.Time) {
    defer close(queue1)
    fmt.Println("### Sample processing1 has started")
    for count := 0; count < 5; count++ {
        queue1 <- time.Now()
        time.Sleep(1 * time.Second)
    }
    fmt.Println("### Sample processing1 has finished")
}

func _sampleProcessing2(queue2 chan int) {
    defer close(queue2)
    fmt.Println("=== Sample processing2 has started")
    for count := 0; count < 10; count++ {
        queue2 <- count
        time.Sleep(1 * time.Second)
    }
    fmt.Println("=== Sample processing2 has finished")
}

func startSample() {
    quit := make(chan bool)
    queue1 := make(chan time.Time)
    queue2 := make(chan int)
    wg.Add(1)
    go _sampleProcessing1(queue1)
    go _sampleProcessing2(queue2)
    go _sampleProcessingMonitor(queue1, queue2, quit)
    <-quit
    fmt.Println("### Sample processing notification")
    wg.Wait()
}

func _sampleProcessingMonitor(queue1 chan time.Time, queue2 chan int, quit chan bool) {
    defer func() {
        wg.Done()
        fmt.Println("+++ Sample processing is done!!")
    }()
    for queue1 != nil || queue2 != nil {
        select {
        case data1, ok := <-queue1:
            if ok {
                fmt.Printf("+++ Sample processing1 is running!! [%s]\n", data1)
            } else {
                queue1 = nil
            }
        case data2, ok := <-queue2:
            if ok {
                fmt.Printf("+++ Sample processing2 is running!! [%d]\n", data2)
            } else {
                queue2 = nil
            }
        }
    }
    quit <- true
}

func main() {
    startSample()
}

動かしてみた

$ go run sample4.go 
=== Sample processing2 has started
### Sample processing1 has started
+++ Sample processing2 is running!! [0]
+++ Sample processing1 is running!! [2019-01-05 09:04:00.899713 +0900 JST m=+0.000449467]
+++ Sample processing1 is running!! [2019-01-05 09:04:01.899787 +0900 JST m=+1.000551478]
+++ Sample processing2 is running!! [1]
+++ Sample processing2 is running!! [2]
+++ Sample processing1 is running!! [2019-01-05 09:04:02.900975 +0900 JST m=+2.001767922]
+++ Sample processing1 is running!! [2019-01-05 09:04:03.902062 +0900 JST m=+3.002886144]
+++ Sample processing2 is running!! [3]
+++ Sample processing2 is running!! [4]
+++ Sample processing1 is running!! [2019-01-05 09:04:04.903236 +0900 JST m=+4.004089263]
### Sample processing1 has finished
+++ Sample processing2 is running!! [5]
+++ Sample processing2 is running!! [6]
+++ Sample processing2 is running!! [7]
+++ Sample processing2 is running!! [8]
+++ Sample processing2 is running!! [9]
=== Sample processing2 has finished
+++ Sample processing is done!!
### Sample processing notification

こちらも、期待通りに動作しています。

◼️ プラクティス「並行処理プログラム間で処理連携してみる」

⬜︎ 5th ステップ (並行処理をパイプラインで直列に連携させる)

O'Reilly本の"Go言語による並行処理"
4.6項「パイプライン」のサンプルコードを参考にして、Golangでの並行処理プログラム間をパイプラインで連携させてみました。

今回のパイプライン処理連携は、次のような流れになります。

  • getTimeUTC関数にて、現在のUTC時刻を、5回取得する(その後、敢えて1秒ウエイトしておく)
  • convertTimeJST関数にて、UTC時刻をJST時刻に変換する
  • JST時刻を出力する
sample5.go
package main

import (
    "fmt"
    "time"
)

func getTimeUTC(
    done <-chan interface{},
    fn func() time.Time,
) <-chan time.Time {
    defer fmt.Println("### getUtcTime has finished")
    valueStream := make(chan time.Time)
    fmt.Println("### getUtcTime has started")
    go func() {
        defer close(valueStream)
        for {
            select {
            case <-done:
                return
            case valueStream <- fn():
            }
        }
    }()
    return valueStream
}

func convertTimeJST(
    done <-chan interface{},
    valueStream <-chan time.Time,
    loopNum int,
) <-chan time.Time {
    defer fmt.Println("=== convertJstTime has finished")
    changeStream := make(chan time.Time)
    fmt.Println("=== convertJstTime has started")
    go func() {
        defer close(changeStream)
        for count := 0; count < loopNum; count++ {
            select {
            case <-done:
                return
            case nowUTC := <-valueStream:
                jst := time.FixedZone("Asia/Tokyo", 9*60*60)
                nowJST := nowUTC.In(jst)
                changeStream <- nowJST
            }
        }
    }()
    return changeStream
}

func nowTime() time.Time {
    data := time.Now().UTC()
    time.Sleep(1 * time.Second)
    return data
}

func startSample() {
    fmt.Println("*** startSample has started")
    done := make(chan interface{})
    defer func() {
        fmt.Println("*** startSample has finished")
        close(done)
    }()
    for nowJST := range convertTimeJST(done, getTimeUTC(done, nowTime), 5) {
        fmt.Println(nowJST)
    }
}

func main() {
    start := time.Now()
    startSample()
    fmt.Printf("total execution time: %v\n", time.Since(start))
}

動かしてみた

$ go run sample5.go 
*** startSample has started
### getUtcTime has started
### getUtcTime has finished
=== convertJstTime has started
=== convertJstTime has finished
2019-01-07 06:19:00.681788 +0900 Asia/Tokyo
2019-01-07 06:19:01.682916 +0900 Asia/Tokyo
2019-01-07 06:19:02.684057 +0900 Asia/Tokyo
2019-01-07 06:19:03.684652 +0900 Asia/Tokyo
2019-01-07 06:19:04.685858 +0900 Asia/Tokyo
*** startSample has finished
total execution time: 5.005599974s

期待通りに、動作していますね。
ただ、getTimeUTC関数は、常時、1秒待ってから終了するため、最終的に、サンプルアプリが終了するには、5秒程度の時間が必要になりました。

⬜︎ 6th ステップ (並行処理パイプラインに、ファンアウト、ファンインを適用する)

O'Reilly本の"Go言語による並行処理"
4.7項「ファンアウト、ファンイン」のサンプルコードを参考にして、Golangでの並行処理パイプラインにファンアウト、ファンインを適用してみました。

今回のパイプライン処理連携も、5thステップと同様に、次のような流れになります。

  • getTimeUTC関数にて、現在のUTC時刻を、5回取得する(その後、敢えて1秒ウエイトしておく)
  • convertTimeJST関数にて、UTC時刻をJST時刻に変換する
  • JST時刻を出力する
sample6.go
package main

import (
    "fmt"
    "runtime"
    "sync"
    "time"
)

func getTimeUTC(
    done <-chan interface{},
    fn func() time.Time,
) <-chan time.Time {
    defer fmt.Println("### getUtcTime has finished")
    valueStream := make(chan time.Time)
    fmt.Println("### getUtcTime has started")
    go func() {
        defer close(valueStream)
        for {
            select {
            case <-done:
                return
            case valueStream <- fn():
            }
        }
    }()
    return valueStream
}

func convertTimeJST(
    done <-chan interface{},
    valueStream <-chan time.Time,
    loopNum int,
) <-chan time.Time {
    defer fmt.Println("=== convertJstTime has finished")
    changeStream := make(chan time.Time)
    fmt.Println("=== convertJstTime has started")
    go func() {
        defer close(changeStream)
        for count := 0; count < loopNum; count++ {
            select {
            case <-done:
                return
            case nowUTC := <-valueStream:
                jst := time.FixedZone("Asia/Tokyo", 9*60*60)
                nowJST := nowUTC.In(jst)
                changeStream <- nowJST
            }
        }
    }()
    return changeStream
}

func nowTime() time.Time {
    data := time.Now().UTC()
    time.Sleep(1 * time.Second)
    return data
}

func fanIn(
    done <-chan interface{},
    channels ...<-chan time.Time,
) <-chan time.Time {
    var wg sync.WaitGroup
    rendezvousStream := make(chan time.Time)

    rendezvous := func(c <-chan time.Time) {
        defer wg.Done()
        for i := range c {
            select {
            case <-done:
                return
            case rendezvousStream <- i:
            }
        }
    }
    wg.Add(len(channels))
    for _, c := range channels {
        go rendezvous(c)
    }

    go func() {
        wg.Wait()
        close(rendezvousStream)
    }()
    return rendezvousStream
}

func startSample() {
    fmt.Println("*** startSample has started")
    done := make(chan interface{})
    defer func() {
        fmt.Println("*** startSample has finished")
        close(done)
    }()
    numFinders := runtime.NumCPU()
    finders := make([]<-chan time.Time, numFinders)
    for i := 0; i < numFinders; i++ {
        finders[i] = getTimeUTC(done, nowTime)
    }

    for nowJST := range convertTimeJST(done, fanIn(done, finders...), 5) {
        fmt.Println(nowJST)
    }
}

func main() {
    start := time.Now()
    startSample()
    fmt.Printf("total execution time: %v\n", time.Since(start))
}

動かしてみた

$ go run sample6.go 
*** startSample has started
### getUtcTime has started
### getUtcTime has finished
### getUtcTime has started
### getUtcTime has finished
### getUtcTime has started
### getUtcTime has finished
### getUtcTime has started
### getUtcTime has finished
### getUtcTime has started
### getUtcTime has finished
### getUtcTime has started
### getUtcTime has finished
### getUtcTime has started
### getUtcTime has finished
### getUtcTime has started
### getUtcTime has finished
=== convertJstTime has started
=== convertJstTime has finished
2019-01-07 06:38:17.050149 +0900 Asia/Tokyo
2019-01-07 06:38:17.050147 +0900 Asia/Tokyo
2019-01-07 06:38:17.050131 +0900 Asia/Tokyo
2019-01-07 06:38:17.050162 +0900 Asia/Tokyo
2019-01-07 06:38:17.050101 +0900 Asia/Tokyo
*** startSample has finished
total execution time: 1.001763478s

期待通りに、動作していますね。
今度は、getTimeUTC関数は、常時、1秒待ってから終了するにも関わらず、最終的に、全体処理は1秒程度で完了することができました。ただし、現在時刻の出力結果から、nowTime関数が起動された順番は、保証されていない事実が判明しました。

O'Reilly本の"Go言語による並行処理"でも、次のような言及がなされていますね。

ファンインとファンアウトのアルゴリズムのナイーブな実装は結果の順番が重要でない場合にのみにうまく動作します。

13
15
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
13
15