この記事は Go2 Advent Calendar 2018 の 24 日目の記事です。
私は組込ソフトエンジニアで、職場にはレガシーな環境が多く残っています。
そして、ビルドツールが古かったりして 2MB にも満たないバイナリを作るのに数十分かかったりしています。
時間がかかる主な理由は、複数 CPU による分散コンパイルが実現されてない(場合が多い)から、です。
ということで、 Go 言語の goroutine を用いて CPU をなるべく使う形のタスクランナーを書くことが多いわけですが、最近は gRPC 経由で複数のマシンを活用する分散ビルド環境を作っているのでまとめました。
動くサンプルを紹介しつつ、徐々に分散ビルドになるように段階的に進めていきます。
分散処理のイメージ
実際に仕事で使っているプロジェクトの分散 build は以下の画像のようになります。
1 CPU で普通にビルドすると 1200 秒かかりますが、分散ビルド (合計 22 CPU) することで 90 秒まで短くすることができました。
横軸は時間 (sec) で、縦軸はリソース (ローカルリソースである localWorker (6CPU) と、リモートリソースである pcA (10 CPU) と pcB (6CPU)) を表します。
処理を始めるとすぐに localWorker が仕事を始めます。
それと同時に pcA および pcB にビルドに必要なファイルを転送 (約3000ファイル 100MB) します。
pcA と pcB は約10秒経過後にファイル受け取りが完了し処理に加わっています。
上の例では、 pcB がわずかに早くファイル受け取りが完了しています。
localWorker(4) が最後に処理しているファイル (右端) は、他のすべてが終わってからしか作成できないファイルです。
pcB(4) が受け持つファイルが完了次第、処理をしています。
このように、隙間が空いている場所は何らかの依存関係があって処理待ちしている状況となります。
また localWorker(0) が処理しているファイルは、1ファイルで90秒近い処理時間となっています。
このファイルが全体のビルド時間を決めてしまっています。
題材
仕事で使っている環境は持ち出せないので、今回はダミーのコンパイラ(dummycc) とリンカ (dummyld) を用意しました。
それぞれの仕様は以下の通り。
- dummycc
- 入力したファイルの1行目に記載された時間をかけて処理する
- 2行目以降の文字列がある場合は、 warning として出力する
- dummyld
- 入力したファイルが存在しなければエラーとする
- 1秒かけてゆっくり処理する
テストに使うCソースは、 aa.c
~ ff.c
までの 36 ファイルですべてのコンパイルが終わってからリンクを行う想定です。
実際のファイルは こちら にあります。
想定する実行は以下の通り。
dummycc -o testdata/aa.o testdata/aa.c
dummycc -o testdata/ab.o testdata/ab.c
・・・
dummycc -o testdata/fe.o testdata/fe.c
dummycc -o testdata/ff.o testdata/ff.c
dummyld -o testdata/a.out testdata/aa.o testdata/ab.o ・・・
上記をもとに []*exec.Cmd
を作成、それをもとに処理を行っていきます。
具体的には下の関数で作成します。
func makeCmds() []*exec.Cmd {
xxx := `dummycc -o testdata/aa.o testdata/aa.c
dummycc -o testdata/ab.o testdata/ab.c
dummycc -o testdata/ac.o testdata/ac.c
// 省略
dummycc -o testdata/fe.o testdata/fe.c
dummycc -o testdata/ff.o testdata/ff.c
dummyld -o testdata/a.out testdata/aa.o testdata/ab.o testdata/ac.o testdata/ad.o testdata/ae.o testdata/af.o testdata/ba.o testdata/bb.o testdata/bc.o testdata/bd.o testdata/be.o testdata/bf.o testdata/ca.o testdata/cb.o testdata/cc.o testdata/cd.o testdata/ce.o testdata/cf.o testdata/da.o testdata/db.o testdata/dc.o testdata/dd.o testdata/de.o testdata/df.o testdata/ea.o testdata/eb.o testdata/ec.o testdata/ed.o testdata/ee.o testdata/ef.o testdata/fa.o testdata/fb.o testdata/fc.o testdata/fd.o testdata/fe.o testdata/ff.o
`
cmds := []*exec.Cmd{}
scanner := bufio.NewScanner(strings.NewReader(xxx))
for scanner.Scan() {
fields := strings.Fields(scanner.Text())
cmds = append(cmds, &exec.Cmd{
Path: fields[0],
Args: fields,
})
}
return cmds
}
build01 : まずは分散せずに普通に build するパターン
シンプルなコードとしては以下の通り。
func build01(cmds []*exec.Cmd) {
for _, cmd := range cmds {
buf, _ := cmd.CombinedOutput()
fmt.Print(string(buf))
}
}
単純にループするだけなので特に難しい事はありません。
1 つの CPU だけを使って順番に処理するので、時間がかかります。
build02 : goroutine を湯水のように使ったパターン
CPU の core / threads 数を考えずに goroutine を湯水のように使ったパターンです。
最初に 36 並列で実行が始まり、すべてのコンパイルが終わってからリンクが始まる、という流れです。
今回の dummycc
は実際には一定時間 time.Sleep()
しているだけなので 36 並列でも問題はないのですが、通常は処理負荷が問題になるでしょう。
リンクする前にはコンパイルが終わっている必要があるので、 wg.Wait()
を使って直前までのコンパイルが全て終わるのを待ち合わせています。
(なお、この書き方だとコンパイルとリンクが交互に走るケース等で CPU を有効活用できませんがここでは無視します)
func build02(cmds []*exec.Cmd) {
var wg sync.WaitGroup
for _, cmd := range cmds {
cmd := cmd
if cmd.Path != dummyCc {
// コンパイラではない時は、直前までのコンパイルが終わるのを待つ
wg.Wait()
}
wg.Add(1)
go func() {
defer wg.Done()
buf, _ := cmd.CombinedOutput()
fmt.Print(string(buf))
}()
}
wg.Wait()
}
build03 : 分散数を指定しつつ処理するパターン
分散数を指定しつつ、の典型例としては cap 付の chan (↓の例では limit
) を用意して制御するパターンがあります。
これはうまく動作しますが、処理時の出力 (warning 等) が混ざる問題があるのでもう少し工夫する必要があります。
func build03(cmds []*exec.Cmd) {
var wg sync.WaitGroup
// *threads 分だけ cap を作っておく事で分散数を制御する
limit := make(chan struct{}, *threads)
for _, cmd := range cmds {
cmd := cmd
if cmd.Path != dummyCc {
// コンパイラではない時は、直前までのコンパイルが終わるのを待つ
wg.Wait()
}
limit <- struct{}{}
wg.Add(1)
go func() {
defer func() { <-limit }()
defer wg.Done()
buf, _ := cmd.CombinedOutput()
fmt.Print(string(buf))
}()
}
wg.Wait()
}
build04 : 分散数を指定しつつ出力をうまくやるパターン
出力を (混ざらず順序よく) うまくやるために、ここでは github.com/sago35/ochan を使いました。
ochan
を使うと ch := oc.GetCh()
を実行した順に出力する ch
を作り出すことができるので、並列実行しても順番通りに出力されます。
また、 oc.Wait()
すると oc.GetCh()
して取り出した ch
が全て閉じられるまで待つので、ほぼ sync.WaitGroup
のように使えます。
ochan
については、 umedago #3 の LT で話をしました。 → chan + 順序制御 = ochan
分散実行のイメージは、 build03 と同じです。
func build04(cmds []*exec.Cmd) {
outCh := make(chan string, 10000)
done := make(chan struct{})
go func() {
for ch := range outCh {
fmt.Print(ch)
}
close(done)
}()
limit := make(chan struct{}, *threads)
oc := ochan.NewOchan(outCh, 100)
for _, cmd := range cmds {
cmd := cmd
if cmd.Path != dummyCc {
// コンパイラではない時は、直前までのコンパイルが終わるのを待つ
oc.Wait()
}
limit <- struct{}{}
ch := oc.GetCh()
go func() {
defer func() { <-limit }()
defer close(ch)
buf, _ := cmd.CombinedOutput()
if len(buf) > 0 {
ch <- string(buf)
}
}()
}
oc.Wait()
close(outCh)
<-done
}
gRPC を用いた分散ビルド環境
基本的には上記で説明した goroutine を用いた処理を gRPC 上の処理にすれば良いわけですが、以下についての考慮が必要です。
- gRPC server 側はソースコードを持っていない
- gRPC server 側はコンパイル済み object (*.o) を持っていない
- 複数の gRPC server で実行する場合はすべての object を持っているわけではない
- gRPC の connection 内に複数の使用可能リソースがある
- gRPC 上のリソースと local リソースを同じように扱いたい
順番に解決していきます。
gRPC server 側はソースコードを持っていない
使いたい時に都度依存ファイルを送る方法も考えましたが、C言語プロジェクトの場合は #include
を解析する必要があるし、どのみちターゲットの C ソースよりも依存するファイルが新しいなら送りなおす等の処置も必要であまり効率的に書ける気がしませんでした。
なので、自分が選んだ方法は git ls-files の結果全て
or 引数で指定したファイルリスト
を分散ビルドの先頭で送るようにしました。
そうすると、今度はファイルを全て送るまではビルドが始まらないという問題があります。
この部分は、(ファイルを送らなくても実行開始できる) local リソースはすぐにビルドを始めつつ、並列で gRPC 経由でファイルを送信し準備ができ次第 gRPC リソースを使うようにしてビルド時間への影響を減らしました。
gRPC server 側はコンパイル済み object (*.o) を持っていない
コンパイル結果等は当然 local に送り返すようにするわけですが、その結果は local にはあるが全ての gRPC server で持っているわけではありません。
また、すべてのファイルの同期をとる意味もないので、 ↑ で最初に送ったファイル以外に依存がある場合は都度送るようにしました。
gRPC の connection 内に複数の使用可能リソースがある
↑ の build03 のようなパターンだと、すべてのローカルリソースは分け隔てなく同じものである、という前提で処理することができます。
が、 gRPC 経由の場合は、ある server に対しての connection は1つだがその中で複数の JOB を実行できる、というような状態になります。
もちろん、複数 connection を作成してもよいとは思いますがその場合でも、「分け隔てなく」という事はなく、それぞれ connection 先という情報を持つ必要があります。
gRPC server A からは 2 CPU 、 gRPC server B からは 3 CPU となると、合計2つの connection で合計5つの CPU を使える形で処理する必要があります。
これを実現する方法はいくつかあるかと思いますが、自分は github.com/sago35/limichan というライブラリを作成して実現しました。
以下のようなイメージで実装することができます。
func limichan_sample() {
l, _ := limichan.New(context.Background())
// gwA からは 2 CPU
gwA := newGrpcWorker(addresInfoA)
l.AddWorker(gwA)
l.AddWorker(gwA)
// gwB からは 3 CPU
gwB := newGrpcWorker(addresInfoB)
l.AddWorker(gwB)
l.AddWorker(gwB)
l.AddWorker(gwB)
for _, job := range jobs {
// worker がある限りは並列実行し、無ければブロックする
// worker は gwA の場合もあれば gwB の場合もある
l.Do(job)
}
// すべての l.Do(job) が完了するのを待つ
err := l.Wait()
if err != nil {
log.Fatal(err)
}
}
gRPC 上のリソースと local リソースを同じように扱いたい
上記の limichan
の l.AddWorker()
は以下の interface を満たす worker は登録可能です。
なので、 gRPC 上のリソースと同じく local のリソースも interface を満たすように実装すれば良いです。
type Worker interface {
Do(context.Context, Job) error
}
gRPC proto
ということで、次は gRPC サービスを作っていきます。
各関数の説明は後述。
ソース全般は以下のあたりにあります。
- proto ファイル
- https://github.com/sago35/grpcbuild
- server 側の実装
- https://github.com/sago35/grpcbuild/tree/master/cmd/server
syntax = "proto3";
package grpcbuild;
service GrpcBuild {
rpc Init(InitRequest) returns (InitResponse) {}
rpc Send(SendRequest) returns (SendResponse) {}
rpc Exec(ExecRequest) returns (ExecResponse) {}
}
message InitRequest {
string Dir = 1;
}
message InitResponse {
}
message File {
string Filename = 1;
string Dir = 2;
bytes Data = 3;
}
message SendRequest {
repeated File Files = 1;
}
message SendResponse {
}
message Cmd {
string Path = 1;
repeated string Args = 2;
repeated string Env = 3;
string Dir = 4;
}
message ExecRequest {
repeated Cmd Cmds = 1;
repeated string Files = 2;
}
message ExecResponse {
int32 ExitCode = 1;
bytes Stdout = 2;
bytes Stderr = 3;
repeated File Files = 4;
}
Init()
service GrpcBuild {
rpc Init(InitRequest) returns (InitResponse) {}
}
message InitRequest {
string Dir = 1;
}
message InitResponse {
}
Init() は引数 Dir によりサンドボックス (の雰囲気の作業ディレクトリ) を作成します。
毎回 Init() する度にすべてのファイルを削除して処理をし直すイメージです。
(本当はうまくサンドボックス化したいのですが、できていません)
Send()
service GrpcBuild {
rpc Send(SendRequest) returns (SendResponse) {}
}
message File {
string Filename = 1;
string Dir = 2;
bytes Data = 3;
}
message SendRequest {
repeated File Files = 1;
}
message SendResponse {
}
Send() は以下の2つの目的で使用します。
- Init() 直後に
gRPC server 側はソースコードを持っていない
への対策としての送信 - Exec() 直前に
gRPC server 側はコンパイル済み object (*.o) を持っていない
への対策としての送信
streaming RPC にしてもよいですが、メモリ使用量が大きくなりがち (複数の接続先に並列に実行するとすぐ数 GB 超になる) なのでうまく使う必要があります。
Exec()
service GrpcBuild {
rpc Exec(ExecRequest) returns (ExecResponse) {}
}
message File {
string Filename = 1;
string Dir = 2;
bytes Data = 3;
}
message Cmd {
string Path = 1;
repeated string Args = 2;
repeated string Env = 3;
string Dir = 4;
}
message ExecRequest {
repeated Cmd Cmds = 1;
repeated string Files = 2;
}
message ExecResponse {
int32 ExitCode = 1;
bytes Stdout = 2;
bytes Stderr = 3;
repeated File Files = 4;
}
そのまま os/exec.Cmd
に渡して処理できるような形で作成していて、コンパイル/リンクで使用します。
使用できる実行体を制限する等の処置を行った方が無難ですが、ここでは何でも実行できる形で作成しています。
ExecResponse
で処理結果のファイル (*.o や a.out 等) を返すように作っています。
処理の流れ
「Init()」 → 「Send()」 → 「Exec() を必要回数繰り返す」 が基本となります。
が、前述の通りリンカ実行前等は依存ファイルを送信する必要があるので 「Send() + Exec()」という形になります。
具体的なコードは、次項の build06.go
を確認してください。
build06 : gRPC を用いた分散ビルドパターン (ただしリモートのみ)
ソースコードの全貌はリンク先を見てください。
以下のように、 newWorker()
で gRPC server に接続し処理を行います。
github.com/sago35/limichan
を使っている以外は、今までのコードとさほど変わりません。
func build06(cmds []*exec.Cmd) {
outCh := make(chan string, 10000)
done := make(chan struct{})
go func() {
for ch := range outCh {
fmt.Print(ch)
}
close(done)
}()
l, _ := limichan.New(context.Background())
w, _ := newWorker(`127.0.0.1`, 12345, *threads)
for i := 0; i < *threads; i++ {
l.AddWorker(w)
}
oc := ochan.NewOchan(outCh, 100)
for _, cmd := range cmds {
cmd := cmd
if cmd.Path != dummyCc {
// コンパイラではない時は、直前までのコンパイルが終わるのを待つ
oc.Wait()
}
j := &job{
cmd: cmd,
ch: oc.GetCh(),
outFile: cmd.Args[2:3],
depFile: cmd.Args[3:],
}
l.Do(j)
}
oc.Wait()
l.Wait()
close(outCh)
<-done
}
build07 : ローカルとリモート (gRPC) の両方を用いた分散ビルドパターン
作ったサンプルだと分かりにくいですが協調して分散ビルドする例です。
先ほどの build06 との差分は、 newLocalWorker()
でローカルリソースを登録した後、 goroutine で gRPC の worker を追加しているところです。
このやり方により、ローカルリソースはすぐに処理をはじめ、リモートは接続等の時間のかかる処理が終わり次第分散ビルドに参加します。
こういうのが簡単に書けるのが Go の良い所ですね。
ここでは、 gRPC 接続+ファイル転送
に時間がかかるイメージで1秒 wait させているので、 ↑ の画像において 127.0.0.1
側の開始が遅いです。
準備ができ次第、ビルドを開始できているのが分かります。
func build07(cmds []*exec.Cmd) {
outCh := make(chan string, 10000)
done := make(chan struct{})
go func() {
for ch := range outCh {
fmt.Print(ch)
}
close(done)
}()
l, _ := limichan.New(context.Background())
w, _ := newLocalWorker()
for i := 0; i < *threads; i++ {
l.AddWorker(w)
}
go func() {
w, _ := newWorker(`127.0.0.1`, 12345, *threads)
// gRPC 接続に時間がかかるのを模擬するため1秒待つ
time.Sleep(1 * time.Second)
for i := 0; i < *threads; i++ {
l.AddWorker(w)
}
}()
oc := ochan.NewOchan(outCh, 100)
for _, cmd := range cmds {
cmd := cmd
if cmd.Path != dummyCc {
// コンパイラではない時は、直前までのコンパイルが終わるのを待つ
oc.Wait()
}
j := &job{
cmd: cmd,
ch: oc.GetCh(),
outFile: cmd.Args[2:3],
depFile: cmd.Args[3:],
}
l.Do(j)
}
oc.Wait()
l.Wait()
close(outCh)
<-done
}
まとめ
駆け足で分散ビルドまで紹介しました。
言葉が足りない部分がたくさんあるかと思いますが、雰囲気は伝わるかと思います。
実際に作ってみて発見があったのは以下です。
- 最初に全ファイル転送する形で実施しても、分散の恩恵は得られる
- streaming RPC でファイル送信を行うと、かなりメモリ消費が大きい (GB 単位)
-
[]*exec.Cmd
のようなものさえ作れれば、後は goroutine で適当に回せるので Go は本当に楽
Go は本当に楽でいいです。
そして、分散ビルドはとても楽しいので是非試してみてください。
ソースコードは以下にあります。
おまけ : 分散処理を行う OSS
今回の内容は OSS でほぼ同じことができるかと思います。
が、私の境遇としてはコンパイラ等が Windows 縛りなので色々諦めている状況です。
実は Windows でも動くよ等の良い情報があったら教えてほしいです。
- distcc/distcc: distributed builds for C, C++ and Objective C
- Bazel - a fast, scalable, multi-language and extensible build system" - Bazel
おまけ2 : 分散処理の可視化のやり方
今回の画像は Google Charts の Timelines
を用いて作成しました。
始まりと終わりの時間を指定しつつグルーピングを指定すると後は良い感じに見せてくれます。
今回の用途で使う場合は 59秒 → 60秒というタイミングで0秒に戻る感じで表示されるので注意が必要です。
上の画像を生成するための html ソースは以下になります。
https://gist.github.com/sago35/792f50b4773c2e8c8ba6aea72e92ef50
おまけ3 : umedago での登壇資料
2019/01/18 の umedago で登壇に使った資料はこちらです。
本記事の補足として参照することができます。